Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 27 additions & 33 deletions MC/analysis_testing/o2dpg_analysis_test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@
# Analsysis task utilities
#
import sys
from os import environ
from os.path import join, exists, abspath, expanduser
from os import environ, listdir
from os.path import join, abspath

import json

# make sure O2DPG + O2 is loaded
O2DPG_ROOT=environ.get('O2DPG_ROOT')
Expand All @@ -22,40 +24,32 @@
ANALYSIS_VALID_DATA = "data"
ANALYSIS_COLLISION_SYSTEM_PP = "pp"
ANALYSIS_COLLISION_SYSTEM_PBPB = "pbpb"
ANALYSIS_CONFIGURATION_PREFIX = "analysis-testing"
ANALYSIS_DEFAULT_CONFIGURATION = {ANALYSIS_COLLISION_SYSTEM_PP: {ANALYSIS_VALID_MC: join(O2DPG_ROOT, "MC", "config", "analysis_testing", "json", "default", ANALYSIS_COLLISION_SYSTEM_PP, f"{ANALYSIS_CONFIGURATION_PREFIX}-{ANALYSIS_VALID_MC}.json"),
ANALYSIS_VALID_DATA: join(O2DPG_ROOT, "MC", "config", "analysis_testing", "json", "default", ANALYSIS_COLLISION_SYSTEM_PP, f"{ANALYSIS_CONFIGURATION_PREFIX}-{ANALYSIS_VALID_DATA}.json")},
ANALYSIS_COLLISION_SYSTEM_PBPB: {ANALYSIS_VALID_MC: join(O2DPG_ROOT, "MC", "config", "analysis_testing", "json", "default", ANALYSIS_COLLISION_SYSTEM_PBPB, f"{ANALYSIS_CONFIGURATION_PREFIX}-{ANALYSIS_VALID_MC}.json"),
ANALYSIS_VALID_DATA: join(O2DPG_ROOT, "MC", "config", "analysis_testing", "json", "default", ANALYSIS_COLLISION_SYSTEM_PBPB, f"{ANALYSIS_CONFIGURATION_PREFIX}-{ANALYSIS_VALID_DATA}.json")}}


def sanitize_configuration_path(path):
# sanitize path
path = path.replace("json://", "")
if path[0] != "$":
# only do this if there is no potential environment variable given as the first part of the path
path = abspath(expanduser(path))
return f"json://{path}"


def get_default_configuration(data_or_mc, collision_system):
path = ANALYSIS_DEFAULT_CONFIGURATION.get(collision_system, None)
if not path:
print(f"ERROR: Unknown collision system {collision_system}")
return None
return path[data_or_mc]


def get_configuration(analysis_name, data_or_mc, collision_system):
path = join(O2DPG_ROOT, "MC", "config", "analysis_testing", "json", analysis_name, collision_system, f"{ANALYSIS_CONFIGURATION_PREFIX}-{data_or_mc}.json")
if not exists(path):
path = get_default_configuration(data_or_mc, collision_system)
if not path:
return None
print(f"INFO: Use default configuration for {analysis_name}")
return sanitize_configuration_path(path)
def adjust_configuration_line(line, data_or_mc, collision_system):
line = line.replace('!ANALYSIS_QC_is_mc!', str(data_or_mc == ANALYSIS_VALID_MC).lower())
line = line.replace('!ANALYSIS_QC_is_data!', str(data_or_mc == ANALYSIS_VALID_DATA).lower())
return line


return sanitize_configuration_path(path)
def adjust_and_get_configuration_path(data_or_mc, collision_system, output_dir):

final_config = {}
path = join(O2DPG_ROOT, "MC", "config", "analysis_testing", "json", "dpl")
for config_path in listdir(path):
if not config_path.endswith('.json'):
continue
json_string = ""
with open(join(path, config_path), 'r') as f:
for line in f:
json_string += adjust_configuration_line(line, data_or_mc, collision_system)
final_config |= json.loads(json_string)
# now we can do some adjustments
output_path = abspath(join(output_dir, 'dpl-config.json'))
with open(output_path, 'w') as f:
json.dump(final_config, f, indent=2)

return output_path


def get_collision_system(collision_system=None):
Expand Down
116 changes: 58 additions & 58 deletions MC/analysis_testing/o2dpg_analysis_test_workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@
import sys
import importlib.util
import argparse
from os import environ
from os import environ, makedirs
from os.path import join, exists, abspath, expanduser
import json

Expand All @@ -90,7 +90,7 @@
from o2dpg_analysis_test_utils import *


def create_ana_task(name, cmd, output_dir, *, needs=None, extraarguments="-b", is_mc=False):
def create_ana_task(name, cmd, output_dir, *, cpu=1, mem='2000', needs=None, extraarguments="-b", is_mc=False):
"""Quick helper to create analysis task

This creates an analysis task from various arguments
Expand All @@ -114,7 +114,7 @@ def create_ana_task(name, cmd, output_dir, *, needs=None, extraarguments="-b", i
if needs is None:
# set to empty list
needs = []
task = createTask(name=full_ana_name(name), cwd=join(output_dir, name), lab=[ANALYSIS_LABEL, name], cpu=1, mem='2000', needs=needs)
task = createTask(name=full_ana_name(name), cwd=join(output_dir, name), lab=[ANALYSIS_LABEL, name], cpu=cpu, mem=mem, needs=needs)
if is_mc:
task["labels"].append(ANALYSIS_LABEL_ON_MC)
task['cmd'] = f"{cmd} {extraarguments}"
Expand All @@ -138,38 +138,6 @@ def load_analyses(analyses_only=None, include_disabled_analyses=False):
return collect_analyses


def add_analysis_post_processing_tasks(workflow):
"""add post-processing step to analysis tasks if possible

Args:
workflow: list
current list of tasks
"""
analyses_to_add_for = {}
# collect analyses in current workflow
for task in workflow:
if ANALYSIS_LABEL in task["labels"]:
analyses_to_add_for[task["name"]] = task

for ana in load_analyses(include_disabled_analyses=True):
if not ana["expected_output"]:
continue
ana_name_raw = ana["name"]
post_processing_macro = join(O2DPG_ROOT, "MC", "analysis_testing", "post_processing", f"{ana_name_raw}.C")
if not exists(post_processing_macro):
continue
ana_name = full_ana_name(ana_name_raw)
if ana_name not in analyses_to_add_for:
continue
pot_ana = analyses_to_add_for[ana_name]
cwd = pot_ana["cwd"]
needs = [ana_name]
task = createTask(name=f"{ANALYSIS_LABEL}_post_processing_{ana_name_raw}", cwd=join(cwd, "post_processing"), lab=[ANALYSIS_LABEL, f"{ANALYSIS_LABEL}PostProcessing", ana_name_raw], cpu=1, mem='2000', needs=needs)
input_files = ",".join([f"../{eo}" for eo in ana["expected_output"]])
cmd = f"\\(\\\"{input_files}\\\",\\\"./\\\"\\)"
task["cmd"] = f"root -l -b -q {post_processing_macro}{cmd}"
workflow.append(task)

def get_additional_workflows(input_aod):
additional_workflows = []

Expand Down Expand Up @@ -207,7 +175,7 @@ def get_additional_workflows(input_aod):
return additional_workflows


def add_analysis_tasks(workflow, input_aod="./AO2D.root", output_dir="./Analysis", *, analyses_only=None, is_mc=True, collision_system=None, needs=None, autoset_converters=False, include_disabled_analyses=False, timeout=None, add_common_args=None):
def add_analysis_tasks(workflow, input_aod="./AO2D.root", output_dir="./Analysis", *, analyses_only=None, is_mc=True, collision_system=None, needs=None, autoset_converters=False, include_disabled_analyses=False, timeout=None, split_analyses=False):
"""Add default analyses to user workflow

Args:
Expand Down Expand Up @@ -238,38 +206,71 @@ def add_analysis_tasks(workflow, input_aod="./AO2D.root", output_dir="./Analysis
data_or_mc = ANALYSIS_VALID_MC if is_mc else ANALYSIS_VALID_DATA
collision_system = get_collision_system(collision_system)

# list of lists, each sub-list corresponds to one analysis pipe to be executed
analysis_pipes = []
# collect the names corresponding to analysis pipes
analysis_names = []
# cpu and mem of each task
analysis_cpu_mem = []
# a list of all tasks to be put together
merged_analysis_pipe = additional_workflows.copy()
# cpu and mem of merged analyses
merged_analysis_cpu_mem = [0, 0]

for ana in load_analyses(analyses_only, include_disabled_analyses=include_disabled_analyses):
if is_mc and not ana.get("valid_mc", False):
print(f"INFO: Analysis {ana['name']} not added since not valid in MC")
continue
if not is_mc and not ana.get("valid_data", False):
print(f"INFO: Analysis {ana['name']} not added since not valid in data")
continue

configuration = get_configuration(ana["name"], data_or_mc, collision_system)
if not configuration:
print(f"INFO: Analysis {ana['name']} excluded due to no valid configuration")
if analyses_only and ana['name'] not in analyses_only:
# filter on analyses if requested
continue
print(f"INFO: Analysis {ana['name']} uses configuration {configuration}")

add_common_args_ana = get_common_args_as_string(ana, add_common_args)
if not add_common_args_ana:
print(f"ERROR: Cannot parse common args for analysis {ana['name']}")
if split_analyses:
# only the individual analyses, no merged
analysis_pipes.append(ana['tasks'])
analysis_names.append(ana['name'])
analysis_cpu_mem.append((1, 2000))
continue

for i in additional_workflows:
if i not in ana["tasks"]:
# print("Appending extra task", i, "to analysis", ana["name"], "as it is not there yet and needed for conversion")
ana["tasks"].append(i)
piped_analysis = f" --configuration {configuration} | ".join(ana["tasks"])
piped_analysis += f" --configuration {configuration} --aod-file {input_aod}"
piped_analysis += add_common_args_ana
merged_analysis_pipe.extend(ana['tasks'])
# underestimate what a single analysis would take in the merged case.
# Putting everything into one big pipe does not mean that the resources scale the same!
merged_analysis_cpu_mem[0] += 0.5
merged_analysis_cpu_mem[1] += 700

if not split_analyses:
# add the merged analysis
analysis_pipes.append(merged_analysis_pipe)
analysis_names.append('MergedAnalyses')
# take at least the resources estimated for a single analysis
analysis_cpu_mem.append((max(1, merged_analysis_cpu_mem[0]), max(2000, merged_analysis_cpu_mem[1])))

# now we need to create the output directory where we want the final configurations to go
output_dir_config = join(output_dir, 'config')
if not exists(output_dir_config):
makedirs(output_dir_config)

configuration = adjust_and_get_configuration_path(data_or_mc, collision_system, output_dir_config)

for analysis_name, analysis_pipe, analysis_res in zip(analysis_names, analysis_pipes, analysis_cpu_mem):
# remove duplicates if they are there for nay reason (especially in the merged case)
analysis_pipe = list(set(analysis_pipe))
analysis_pipe_assembled = []
for executable_string in analysis_pipe:
# the input executable might come already with some configurations, the very first token is the actual executable
executable_string += f' --configuration json://{configuration}'
analysis_pipe_assembled.append(executable_string)

# put together, add AOD and timeout if requested
analysis_pipe_assembled = ' | '.join(analysis_pipe_assembled)
analysis_pipe_assembled += f' --aod-file {input_aod} --shm-segment-size 3000000000 --readers 1 --aod-memory-rate-limit 500000000'
if timeout is not None:
piped_analysis += f" --time-limit {timeout}"
workflow.append(create_ana_task(ana["name"], piped_analysis, output_dir, needs=needs, is_mc=is_mc))
analysis_pipe_assembled += f' --time-limit {timeout}'

# append potential post-processing
add_analysis_post_processing_tasks(workflow)
workflow.append(create_ana_task(analysis_name, analysis_pipe_assembled, output_dir, cpu=analysis_res[0], mem=analysis_res[1], needs=needs, is_mc=is_mc))


def add_analysis_qc_upload_tasks(workflow, period_name, run_number, pass_name):
Expand Down Expand Up @@ -300,7 +301,6 @@ def add_analysis_qc_upload_tasks(workflow, period_name, run_number, pass_name):
# search through workflow stages if we can find the requested analysis
pot_ana = analyses_to_add_for[ana_name]
cwd = pot_ana["cwd"]
qc_tag = f"Analysis{ana_name_raw}"
needs = [ana_name]
provenance = "qc_mc" if ANALYSIS_LABEL_ON_MC in pot_ana["labels"] else "qc"
for eo in ana["expected_output"]:
Expand All @@ -325,7 +325,7 @@ def run(args):
### setup global environment variables which are valid for all tasks, set as first task
global_env = {"ALICEO2_CCDB_CONDITION_NOT_AFTER": args.condition_not_after} if args.condition_not_after else None
workflow = [createGlobalInitTask(global_env)]
add_analysis_tasks(workflow, args.input_file, expanduser(args.analysis_dir), is_mc=args.is_mc, analyses_only=args.only_analyses, autoset_converters=args.autoset_converters, include_disabled_analyses=args.include_disabled, timeout=args.timeout, collision_system=args.collision_system, add_common_args=args.add_common_args)
add_analysis_tasks(workflow, args.input_file, expanduser(args.analysis_dir), is_mc=args.is_mc, analyses_only=args.only_analyses, autoset_converters=args.autoset_converters, include_disabled_analyses=args.include_disabled, timeout=args.timeout, collision_system=args.collision_system, split_analyses=args.split_analyses)
if args.with_qc_upload:
add_analysis_qc_upload_tasks(workflow, args.period_name, args.run_number, args.pass_name)
if not workflow:
Expand All @@ -351,8 +351,8 @@ def main():
parser.add_argument("--autoset-converters", dest="autoset_converters", action="store_true", help="Compatibility mode to automatically set the converters for the analysis")
parser.add_argument("--timeout", type=int, default=None, help="Timeout for analysis tasks in seconds.")
parser.add_argument("--collision-system", dest="collision_system", help="Set the collision system. If not set, tried to be derived from ALIEN_JDL_LPMInterationType. Fallback to pp")
parser.add_argument("--add-common-args", dest="add_common_args", nargs="*", help="Pass additional common arguments per analysis, for instance --add-common-args EMCAL-shm-segment-size 2500000000 will add --shm-segment-size 2500000000 to the EMCAL analysis")
parser.add_argument('--condition-not-after', dest="condition_not_after", type=int, help="only consider CCDB objects not created after this timestamp (for TimeMachine)", default=3385078236000)
parser.add_argument('--split-analyses', dest='split_analyses', action='store_true', help='Split into single analyses pipes to be executed.')

parser.set_defaults(func=run)
args = parser.parse_args()
Expand Down
39 changes: 0 additions & 39 deletions MC/analysis_testing/post_processing/PWGMMMDnDeta.C

This file was deleted.

Loading