Skip to content

Commit

Permalink
Merge pull request #75 from PanDAWMS/next
Browse files Browse the repository at this point in the history
3.5.0.31
  • Loading branch information
PalNilsson committed Feb 21, 2023
2 parents 309c5ca + 2734dfc commit 23a777a
Show file tree
Hide file tree
Showing 25 changed files with 952 additions and 42 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/build-docs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,10 @@ jobs:
python-version: ['3.7']
steps:
- name: Checkout Pilot 3 repo
uses: actions/checkout@v2
uses: actions/checkout@v3

- name: Setup Python
uses: actions/setup-python@v2
uses: actions/setup-python@v4
with:
python-version: ${{ matrix.python-version }}
architecture: x64
Expand Down
4 changes: 2 additions & 2 deletions .github/workflows/unit-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ jobs:
FLAKE8_CONFIG: ".flake8"
steps:
- name: Checkout Pilot3 repo
uses: actions/checkout@v2
uses: actions/checkout@v3

# - name: Hack me some python
# run: |
Expand All @@ -27,7 +27,7 @@ jobs:
# fi

- name: Setup Python3
uses: actions/setup-python@v2
uses: actions/setup-python@v4
with:
python-version: ${{ matrix.python-version }}
architecture: x64
Expand Down
2 changes: 1 addition & 1 deletion PILOTVERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
3.4.8.5
3.5.0.31
4 changes: 3 additions & 1 deletion pilot/common/errorcodes.py
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,7 @@ class ErrorCodes:
BADOUTPUTFILENAME = 1371
APPTAINERNOTINSTALLED = 1372
CERTIFICATEHASEXPIRED = 1373
REMOTEFILEDICTDOESNOTEXIST = 1374

_error_messages = {
GENERALERROR: "General pilot error, consult batch log",
Expand Down Expand Up @@ -292,7 +293,8 @@ class ErrorCodes:
FRONTIER: "Frontier error",
VOMSPROXYABOUTTOEXPIRE: "VOMS proxy is about to expire",
BADOUTPUTFILENAME: "Output file name contains illegal characters",
CERTIFICATEHASEXPIRED: "Certificate has expired"
CERTIFICATEHASEXPIRED: "Certificate has expired",
REMOTEFILEDICTDOESNOTEXIST: "Remote file open dictionary does not exist"
}

put_error_codes = [1135, 1136, 1137, 1141, 1152, 1181]
Expand Down
16 changes: 9 additions & 7 deletions pilot/control/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@
from pilot.util.queuehandling import scan_for_jobs, put_in_queue, queue_report, purge_queue
from pilot.util.realtimelogger import cleanup as rtcleanup
from pilot.util.timing import add_to_pilot_timing, timing_report, get_postgetjob_time, get_time_since, time_stamp
from pilot.util.workernode import get_disk_space, collect_workernode_info, get_node_name, get_cpu_model
from pilot.util.workernode import get_disk_space, collect_workernode_info, get_node_name, get_cpu_model, get_cpu_cores

logger = logging.getLogger(__name__)
errors = ErrorCodes()
Expand Down Expand Up @@ -322,7 +322,7 @@ def send_state(job, args, state, xml=None, metadata=None, test_tobekilled=False)
final = is_final_update(job, state, tag='sending' if args.update_server else 'writing')

# build the data structure needed for updateJob
data = get_data_structure(job, state, args, xml=xml, metadata=metadata)
data = get_data_structure(job, state, args, xml=xml, metadata=metadata, final=final)
logger.debug(f'data={data}')

# write the heartbeat message to file if the server is not to be updated by the pilot (Nordugrid mode)
Expand Down Expand Up @@ -574,12 +574,12 @@ def add_data_structure_ids(data, version_tag, job):
else:
data['pilotID'] = "%s|%s|%s" % (pilotid, version_tag, pilotversion)
else:
logger.debug('no pilotid')
logger.warning('pilotid not available')

return data


def get_data_structure(job, state, args, xml=None, metadata=None):
def get_data_structure(job, state, args, xml=None, metadata=None, final=False): # noqa: C901
"""
Build the data structure needed for updateJob.
Expand All @@ -588,6 +588,7 @@ def get_data_structure(job, state, args, xml=None, metadata=None):
:param args: Pilot args object.
:param xml: optional XML string.
:param metadata: job report metadata read as a string.
:param final: is this for the final server update? (Boolean).
:return: data structure (dictionary).
"""

Expand Down Expand Up @@ -621,7 +622,6 @@ def get_data_structure(job, state, args, xml=None, metadata=None):
# add the core count
if job.corecount and job.corecount != 'null' and job.corecount != 'NULL':
data['coreCount'] = job.corecount
#data['coreCount'] = mean(job.corecounts) if job.corecounts else job.corecount
if job.corecounts:
_mean = mean(job.corecounts)
logger.info(f'mean actualcorecount: {_mean}')
Expand All @@ -634,12 +634,14 @@ def get_data_structure(job, state, args, xml=None, metadata=None):
else:
logger.info("payload/TRF did not report the number of read events")

# get the CU consumption time
# get the CPU consumption time
constime = get_cpu_consumption_time(job.cpuconsumptiontime)
if constime and constime != -1:
data['cpuConsumptionTime'] = constime
data['cpuConversionFactor'] = job.cpuconversionfactor
data['cpuConsumptionUnit'] = job.cpuconsumptionunit + "+" + get_cpu_model()
cpumodel = get_cpu_model()
cpumodel = get_cpu_cores(cpumodel) # add the CPU cores if not present
data['cpuConsumptionUnit'] = job.cpuconsumptionunit + "+" + cpumodel

instruction_sets = has_instruction_sets(['AVX2'])
product, vendor = get_display_info()
Expand Down
2 changes: 1 addition & 1 deletion pilot/info/filespec.py
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ def is_directaccess(self, ensure_replica=True, allowed_replica_schemas=None):
is_rootfile = True
exclude_pattern = ['.tar.gz', '.lib.tgz', '.raw.']
for e in exclude_pattern:
if e in filename:
if e in filename or filename.startswith('raw.'):
is_rootfile = False
break

Expand Down
14 changes: 10 additions & 4 deletions pilot/info/jobdata.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
#
# Authors:
# - Alexey Anisenkov, anisyonk@cern.ch, 2018-2019
# - Paul Nilsson, paul.nilsson@cern.ch, 2018-2021
# - Paul Nilsson, paul.nilsson@cern.ch, 2018-2023
# - Wen Guan, wen.guan@cern.ch, 2018

"""
Expand Down Expand Up @@ -69,6 +69,7 @@ class JobData(BaseData):
realtimelogging = False # True for real-time logging (set by server/job definition/args)
pandasecrets = "" # User defined secrets
pilotsecrets = {} # Real-time logging secrets
requestid = None # Request ID

# set by the pilot (not from job definition)
workdir = "" # working directory for this job
Expand Down Expand Up @@ -160,7 +161,8 @@ class JobData(BaseData):

# specify the type of attributes for proper data validation and casting
_keys = {int: ['corecount', 'piloterrorcode', 'transexitcode', 'exitcode', 'cpuconversionfactor', 'exeerrorcode',
'attemptnr', 'nevents', 'neventsw', 'pid', 'cpuconsumptiontime', 'maxcpucount', 'actualcorecount'],
'attemptnr', 'nevents', 'neventsw', 'pid', 'cpuconsumptiontime', 'maxcpucount', 'actualcorecount',
'requestid'],
str: ['jobid', 'taskid', 'jobparams', 'transformation', 'destinationdblock', 'exeerrordiag'
'state', 'serverstate', 'workdir', 'stageout',
'platform', 'piloterrordiag', 'exitmsg', 'produserid', 'jobdefinitionid', 'writetofile',
Expand Down Expand Up @@ -302,7 +304,7 @@ def get_kmap():
# 'internal_name': 'ext_key_structure'
'lfn': 'inFiles',
##'??': 'dispatchDblock', '??define_proper_internal_name': 'dispatchDBlockToken',
'dataset': 'realDatasetsIn', 'guid': 'GUID',
'dataset': 'realDatasetsIn', 'guid': 'GUID', 'requestid': 'reqID',
'filesize': 'fsize', 'checksum': 'checksum', 'scope': 'scopeIn',
##'??define_internal_key': 'prodDBlocks',
'storage_token': 'prodDBlockToken',
Expand Down Expand Up @@ -1019,5 +1021,9 @@ def reset_errors(self): # temporary fix, make sure all queues are empty before
self.subprocesses = []

def to_json(self):
"""
Convert class to dictionary.
"""

from json import dumps
return dumps(self, default=lambda o: o.__dict__)
return dumps(self, default=lambda par: par.__dict__)
44 changes: 43 additions & 1 deletion pilot/scripts/open_remote_file.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,18 +7,22 @@
# - Paul Nilsson, paul.nilsson@cern.ch, 2020-2021

import argparse
import functools
import os
import logging
import threading
import queue
import ROOT
import signal
import threading
import traceback
from collections import namedtuple

from pilot.util.config import config
from pilot.util.filehandling import (
establish_logging,
write_json,
)
from pilot.util.processes import kill_processes

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -144,6 +148,41 @@ def spawn_file_open_thread(queues, file_list):
return thread


def register_signals(signals, args):
"""
Register kill signals for intercept function.
:param signals: list of signals.
:param args: pilot args.
:return:
"""

for sig in signals:
signal.signal(sig, functools.partial(interrupt, args))


def interrupt(args, signum, frame):
"""
Interrupt function on the receiving end of kill signals.
This function is forwarded any incoming signals (SIGINT, SIGTERM, etc) and will set abort_job which instructs
the threads to abort the job.
:param args: pilot arguments.
:param signum: signal.
:param frame: stack/execution frame pointing to the frame that was interrupted by the signal.
:return:
"""

try:
sig = [v for v, k in signal.__dict__.iteritems() if k == signum][0]
except Exception:
sig = [v for v, k in list(signal.__dict__.items()) if k == signum][0]
logger.warning(f'caught signal: {sig} in FRAME=\n%s', '\n'.join(traceback.format_stack(frame)))
logger.warning(f'will terminate pid={os.getpid()}')
logging.shutdown()
kill_processes(os.getpid())


if __name__ == '__main__':
"""
Main function of the remote file open script.
Expand All @@ -167,6 +206,9 @@ def spawn_file_open_thread(queues, file_list):
establish_logging(debug=args.debug, nopilotlog=args.nopilotlog, filename=logname)
logger = logging.getLogger(__name__)

logger.info('setting up signal handling')
register_signals([signal.SIGINT, signal.SIGTERM, signal.SIGQUIT, signal.SIGSEGV, signal.SIGXCPU, signal.SIGUSR1, signal.SIGBUS], args)

# get the file info
file_list_dictionary = get_file_lists(args.turls)
turls = file_list_dictionary.get('turls')
Expand Down
9 changes: 8 additions & 1 deletion pilot/user/atlas/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,6 @@ def open_remote_files(indata, workdir, nthreads):
cmd = create_root_container_command(workdir, _cmd)

show_memory_usage()

timeout = len(indata) * 30 + 600
logger.info('executing file open verification script (timeout=%d):\n\n\'%s\'\n\n', timeout, cmd)

Expand All @@ -238,6 +237,9 @@ def open_remote_files(indata, workdir, nthreads):
elif _exitcode:
if exitcode == errors.COMMANDTIMEDOUT and _exitcode == errors.REMOTEFILECOULDNOTBEOPENED:
exitcode = errors.REMOTEFILEOPENTIMEDOUT
elif exitcode == errors.COMMANDTIMEDOUT and _exitcode == errors.REMOTEFILEDICTDOESNOTEXIST:
exitcode = errors.REMOTEFILEOPENTIMEDOUT
diagnostics = f'remote file open command was timed-out and: {diagnostics}' # cannot give further info
else: # REMOTEFILECOULDNOTBEOPENED
exitcode = _exitcode
else:
Expand Down Expand Up @@ -268,6 +270,11 @@ def parse_remotefileverification_dictionary(workdir):
config.Pilot.remotefileverification_dictionary
)

if not os.path.exists(dictionary_path):
diagnostics = f'file {dictionary_path} does not exist'
logger.warning(diagnostics)
return errors.REMOTEFILEDICTDOESNOTEXIST, diagnostics, not_opened

file_dictionary = read_json(dictionary_path)
if not file_dictionary:
diagnostics = 'could not read dictionary from %s' % dictionary_path
Expand Down
53 changes: 52 additions & 1 deletion pilot/user/atlas/jobmetrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,17 @@
# http://www.apache.org/licenses/LICENSE-2.0
#
# Authors:
# - Paul Nilsson, paul.nilsson@cern.ch, 2018-2021
# - Paul Nilsson, paul.nilsson@cern.ch, 2018-2023

import os
import re
import logging

from pilot.api import analytics
from pilot.util.jobmetrics import get_job_metrics_entry
from pilot.util.features import MachineFeatures, JobFeatures
from pilot.util.filehandling import find_last_line
from pilot.util.math import float_to_rounded_string

from .cpu import get_core_count
from .common import get_db_info, get_resimevents
Expand Down Expand Up @@ -70,6 +72,9 @@ def get_job_metrics_string(job):
else:
logger.info("will not add max space = %d B to job metrics", max_space)

# add job and machine feature data if available
job_metrics = add_features(job_metrics, corecount, add=['hs06'])

# get analytics data
job_metrics = add_analytics_data(job_metrics, job.workdir, job.state)

Expand All @@ -79,6 +84,52 @@ def get_job_metrics_string(job):
return job_metrics


def add_features(job_metrics, corecount, add=[]):
"""
Add job and machine feature data to the job metrics if available
If a non-empty add list is specified, only include the corresponding features. If empty/not specified, add all.
:param job_metrics: job metrics (string).
:param corecount: core count (int).
:param add: features to be added (list).
:return: updated job metrics (string).
"""

if job_metrics and not job_metrics.endswith(' '):
job_metrics += ' '

def add_sub_features(job_metrics, features_dic, add=[]):
features_str = ''
for key in features_dic.keys():
if add and key not in add:
continue
value = features_dic.get(key, None)
if value:
features_str += f'{key}={value} '
return features_str

machinefeatures = MachineFeatures().get()
jobfeatures = JobFeatures().get()
# correct hs06 for corecount: hs06*perf_scale/total_cpu*corecount
hs06 = machinefeatures.get('hs06', 0)
total_cpu = machinefeatures.get('total_cpu', 0)
if hs06 and total_cpu:
perf_scale = 1
try:
machinefeatures_hs06 = 1.0 * int(float(hs06)) * perf_scale * corecount / (1.0 * int(float(total_cpu)))
machinefeatures['hs06'] = float_to_rounded_string(machinefeatures_hs06, precision=2)
logger.info(f"hs06={machinefeatures.get('hs06')} ({hs06}) total_cpu={total_cpu} corecount={corecount} perf_scale={perf_scale}")
except (TypeError, ValueError) as exc:
logger.warning(f'cannot process hs06 machine feature: {exc} (hs06={hs06}, total_cpu={total_cpu}, corecount={corecount})')
features_list = [machinefeatures, jobfeatures]
for feature_item in features_list:
features_str = add_sub_features(job_metrics, feature_item, add=add)
if features_str:
job_metrics += features_str

return job_metrics


def add_analytics_data(job_metrics, workdir, state):
"""
Add the memory leak+chi2 analytics data to the job metrics.
Expand Down
4 changes: 0 additions & 4 deletions pilot/user/atlas/metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,10 +58,6 @@ def create_input_file_metadata(file_dictionary, workdir, filename="PoolFileCatal
xml = ElementTree.tostring(data, encoding='utf8')
xml = minidom.parseString(xml).toprettyxml(indent=" ")

# add escape character for & (needed for google turls)
if '&' in xml:
xml = xml.replace('&', '&')

# stitch in the DOCTYPE
xml = xml.replace('<POOLFILECATALOG>', '<!DOCTYPE POOLFILECATALOG SYSTEM "InMemory">\n<POOLFILECATALOG>')

Expand Down
14 changes: 14 additions & 0 deletions pilot/user/atlas/proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,10 @@ def verify_arcproxy(envsetup, limit, proxy_id="pilot", test=False): # noqa: C90
# validityLeft - duration of proxy validity left in seconds.
# vomsACvalidityEnd - timestamp when VOMS attribute validity ends.
# vomsACvalidityLeft - duration of VOMS attribute validity left in seconds.
cmd = f"{envsetup}arcproxy -i subject"
_exit_code, stdout, stderr = execute(cmd, shell=True) # , usecontainer=True, copytool=True)
logger.info(f'subject={stdout}')

cmd = f"{envsetup}arcproxy -i validityEnd -i validityLeft -i vomsACvalidityEnd -i vomsACvalidityLeft"
_exit_code, stdout, stderr = execute(cmd, shell=True) # , usecontainer=True, copytool=True)
if stdout is not None:
Expand Down Expand Up @@ -471,3 +475,13 @@ def extract_time_left_old(stdout):
logger.info(f"validity_end = {validity_end}")

return validity_end, stdout


def getproxy_dictionary(voms_role):
"""
Prepare the dictionary for the getProxy call.
:param voms_role: VOMS role (string).
"""

return {'role': voms_role, 'dn': 'atlpilo2'} if voms_role == 'atlas' else {'role': voms_role}

0 comments on commit 23a777a

Please sign in to comment.