From ee41b9a4eecc1f5c2b7bd937aae6a1de0a809dc4 Mon Sep 17 00:00:00 2001 From: Iuri Oksuzian Date: Fri, 27 Mar 2026 11:26:43 -0500 Subject: [PATCH] Update Run1B/MDC2025 configs and improve pomsMonitor analysis tools MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Config updates: - Run1B: bump to Run1Bai campaign (dsconf, geometry v03→v06, run 1460, SimJob/Run1Bai); update beam flash filter settings (DetStepFilter) - MDC2025 reco: add tarball_append=-reco, add NoPrimaryMix1BBTriggerable - MDC2025 evntuple: update to MDC2025-002, add MDC2025af input datasets, bump to AnalysisMDC2025/v01_02_00 - MDC2025 mix: update FlatGamma to MDC2025ag, fix inloc/dsconf/setup - MDC2025 primary_muon: update RMCFlatGamma(Calo) to MDC2025ag, normalize input_data to dict format Monitoring/analysis improvements: - db_analyzer: add --since and --needs-processing filters; add ignore/unignore/list-ignored for dataset tracking; color-code output (yellow=unprocessed, grey=ignored, green=complete, red=incomplete) - pomsMonitor, db_builder, poms_db: supporting changes for new filters - fcldump, json2jobdef: minor fixes Co-Authored-By: Claude Sonnet 4.6 --- data/Run1B/resampler_beam_mixing.json | 56 ++++++++--------- data/mdc2025/evntuple.json | 32 +++++++--- data/mdc2025/mix.json | 16 ++--- data/mdc2025/primary_muon.json | 14 ++--- data/mdc2025/reco.json | 53 +++++++++++++++- utils/db_analyzer.py | 90 ++++++++++++++++++++++++++- utils/db_builder.py | 19 ++++-- utils/fcldump.py | 19 ++++-- utils/json2jobdef.py | 4 +- utils/pomsMonitor.py | 70 +++++++++++++++++++-- utils/poms_db.py | 6 ++ 11 files changed, 307 insertions(+), 72 deletions(-) diff --git a/data/Run1B/resampler_beam_mixing.json b/data/Run1B/resampler_beam_mixing.json index 29de18f..24485a4 100644 --- a/data/Run1B/resampler_beam_mixing.json +++ b/data/Run1B/resampler_beam_mixing.json @@ -1,11 +1,11 @@ [ { "desc": "NeutralsFlash", - "dsconf": "Run1Bag", + "dsconf": "Run1Bai-003", "fcl": "Production/JobConfig/pileup/NeutralsResampler.fcl", "fcl_overrides": { "#include": "Production/JobConfig/pileup/epilog_1b.fcl", - "services.GeometryService.inputFile": "Offline/Mu2eG4/geom/geom_run1_b_v03.txt", + "services.GeometryService.inputFile": "Offline/Mu2eG4/geom/geom_run1_b_v06.txt", "services.GeometryService.bFieldFile": "Offline/Mu2eG4/geom/bfgeom_DSOff.txt", "physics.producers.compressDetStepMCs.compressionOptions.keepNGenerations": -1 }, @@ -13,8 +13,8 @@ "input_data": {"sim.mu2e.Neutrals.MDC2025ae3.art": 1}, "njobs": 5000, "events": 50000, - "run": 1450, - "simjob_setup": "/cvmfs/mu2e.opensciencegrid.org/Musings/SimJob/Run1Bag/setup.sh", + "run": 1460, + "simjob_setup": "/cvmfs/mu2e.opensciencegrid.org/Musings/SimJob/Run1Bai/setup.sh", "owner": "mu2e", "inloc": "resilient", "outloc": { @@ -24,23 +24,22 @@ }, { "desc": "MuBeamFlash", - "dsconf": "Run1Bag", + "dsconf": "Run1Bai-007", "fcl": "Production/JobConfig/pileup/MuBeamResampler.fcl", "fcl_overrides": { "#include": "Production/JobConfig/pileup/epilog_1b.fcl", - "services.GeometryService.inputFile": "Offline/Mu2eG4/geom/geom_run1_b_v03.txt", + "services.GeometryService.inputFile": "Offline/Mu2eG4/geom/geom_run1_b_v06.txt", "services.GeometryService.bFieldFile": "Offline/Mu2eG4/geom/bfgeom_DSOff.txt", - "physics.producers.compressDetStepMCs.compressionOptions.keepNGenerations": -1 + "physics.filters.DetStepFilter.StrawGasSteps": [], + "physics.filters.DetStepFilter.CrvSteps": [], + "physics.filters.DetStepFilter.MinimumSumCaloStepE": 1.0 }, "resampler_name": "beamResampler", - "input_data": { - "sim.mu2e.MuBeamCat.Run1Baa.art": 1 - }, + "input_data": {"sim.mu2e.MuBeamCat.Run1Bai.art": 1}, "njobs": 5000, "events": 400000, - "run": 1451, - "version": 1, - "simjob_setup": "/cvmfs/mu2e.opensciencegrid.org/Musings/SimJob/Run1Bag/setup.sh", + "run": 1460, + "simjob_setup": "/cvmfs/mu2e.opensciencegrid.org/Musings/SimJob/Run1Bai/setup.sh", "owner": "mu2e", "inloc": "resilient", "outloc": { @@ -50,23 +49,22 @@ }, { "desc": "EleBeamFlash", - "dsconf": "Run1Bag", + "dsconf": "Run1Bai-007", "fcl": "Production/JobConfig/pileup/EleBeamResampler.fcl", "fcl_overrides": { "#include": "Production/JobConfig/pileup/epilog_1b.fcl", - "services.GeometryService.inputFile": "Offline/Mu2eG4/geom/geom_run1_b_v03.txt", + "services.GeometryService.inputFile": "Offline/Mu2eG4/geom/geom_run1_b_v06.txt", "services.GeometryService.bFieldFile": "Offline/Mu2eG4/geom/bfgeom_DSOff.txt", - "physics.producers.compressDetStepMCs.compressionOptions.keepNGenerations": -1 + "physics.filters.DetStepFilter.StrawGasSteps": [], + "physics.filters.DetStepFilter.CrvSteps": [], + "physics.filters.DetStepFilter.MinimumSumCaloStepE": 1.0 }, "resampler_name": "beamResampler", - "input_data": { - "sim.mu2e.EleBeamCat.Run1Baa.art": 1 - }, + "input_data": {"sim.mu2e.EleBeamCat.Run1Bai.art": 1}, "njobs": 5000, "events": 1000000, - "run": 1451, - "version": 1, - "simjob_setup": "/cvmfs/mu2e.opensciencegrid.org/Musings/SimJob/Run1Bag/setup.sh", + "run": 1460, + "simjob_setup": "/cvmfs/mu2e.opensciencegrid.org/Musings/SimJob/Run1Bai/setup.sh", "owner": "mu2e", "inloc": "resilient", "outloc": { @@ -76,22 +74,24 @@ }, { "desc": "MuStopPileup", - "dsconf": "Run1Bag", + "dsconf": "Run1Bai-007", "fcl": "Production/JobConfig/pileup/MuStopPileup.fcl", "fcl_overrides": { "#include": "Production/JobConfig/pileup/epilog_1b.fcl", - "services.GeometryService.inputFile": "Offline/Mu2eG4/geom/geom_run1_b_v03.txt", + "services.GeometryService.inputFile": "Offline/Mu2eG4/geom/geom_run1_b_v06.txt", "services.GeometryService.bFieldFile": "Offline/Mu2eG4/geom/bfgeom_DSOff.txt", - "physics.producers.compressDetStepMCs.compressionOptions.keepNGenerations": -1 + "physics.filters.DetStepFilter.StrawGasSteps": [], + "physics.filters.DetStepFilter.CrvSteps": [], + "physics.filters.DetStepFilter.MinimumSumCaloStepE": 1.0 }, "resampler_name": "TargetStopResampler", "input_data": { - "sim.mu2e.MuminusStopsCat.Run1Baa.art": 1 + "sim.mu2e.MuminusStopsCat.Run1Bai.art": 1 }, "njobs": 5000, "events": 400000, - "run": 1450, - "simjob_setup": "/cvmfs/mu2e.opensciencegrid.org/Musings/SimJob/Run1Bag/setup.sh", + "run": 1460, + "simjob_setup": "/cvmfs/mu2e.opensciencegrid.org/Musings/SimJob/Run1Bai/setup.sh", "owner": "mu2e", "inloc": "resilient", "outloc": { diff --git a/data/mdc2025/evntuple.json b/data/mdc2025/evntuple.json index c47894e..0712b4a 100644 --- a/data/mdc2025/evntuple.json +++ b/data/mdc2025/evntuple.json @@ -1,16 +1,28 @@ [ { - "dsconf": ["MDC2025-000"], + "dsconf": ["MDC2025-002"], "fcl": ["EventNtuple/fcl/from_mcs-mockdata.fcl"], "input_data": [ - {"mcs.mu2e.CeEndpointMix1BBTriggered.MDC2025ae_best_v1_3.art": 10}, - {"mcs.mu2e.CeEndpointOnSpillTriggered.MDC2025ae_best_v1_3.art": 4}, - {"mcs.mu2e.CeMLeadingLogOnSpillTriggered.MDC2025ae_best_v1_3.art": 1}, - {"mcs.mu2e.CePLeadingLogOnSpillTriggered.MDC2025ae_best_v1_3.art": 1}, - {"mcs.mu2e.CePlusEndpointMix1BBTriggered.MDC2025ae_best_v1_3.art": 10}, - {"mcs.mu2e.CePlusEndpointOnSpillTriggered.MDC2025ae_best_v1_3.art": 1}, - {"mcs.mu2e.CosmicSignalMix1BBTriggered.MDC2025ae_best_v1_3.art": 50}, - {"mcs.mu2e.CosmicSignalOnSpillTriggered.MDC2025ae_best_v1_3.art": 1} + {"mcs.mu2e.CePLeadingLogOnSpillTriggered.MDC2025ae_best_v1_3.art": 1}, + {"mcs.mu2e.CeEndpointOnSpillTriggered.MDC2025ae_best_v1_3.art": 1}, + {"mcs.mu2e.CeMLeadingLogOnSpillTriggered.MDC2025ae_best_v1_3.art": 1}, + {"mcs.mu2e.CePlusEndpointOnSpillTriggered.MDC2025ae_best_v1_3.art": 1}, + {"mcs.mu2e.CosmicSignalOnSpillTriggered.MDC2025ae_best_v1_3.art": 1}, + {"mcs.mu2e.CosmicCRYExtractedTriggered.MDC2025ae_best_v1_3.art": 1}, + {"mcs.mu2e.CosmicSignalOffSpillTriggered-CH.MDC2025af_best_v1_3.art": 1}, + {"mcs.mu2e.CosmicSignalOffSpillTriggered-LH.MDC2025af_best_v1_3.art": 1}, + {"mcs.mu2e.ensembleMDS3aOnSpillTriggered.MDC2025af_best_v1_3.art": 1}, + {"mcs.mu2e.RPCInternalPhysicalOnSpillTriggered.MDC2025af_best_v1_3.art": 1}, + {"mcs.mu2e.DIOtail95OnSpillTriggered.MDC2025af_best_v1_3.art": 1}, + {"mcs.mu2e.RPCExternalPhysicalOnSpillTriggered.MDC2025af_best_v1_3.art": 1}, + {"mcs.mu2e.ensembleMDS3aMix1BBTriggered.MDC2025af_best_v1_1.art": 1}, + {"mcs.mu2e.CeMLeadingLogMix1BBTriggered.MDC2025af_best_v1_1.art": 1}, + {"mcs.mu2e.FlateMinusMix1BBTriggered.MDC2025af_best_v1_1.art": 1}, + {"mcs.mu2e.RMCInternalOnSpillOnSpillTriggered.MDC2025af_best_v1_1.art": 1}, + {"mcs.mu2e.RMCExternalOnSpillOnSpillTriggered.MDC2025af_best_v1_1.art": 1}, + {"mcs.mu2e.NoPrimaryMix1BBTriggerable.MDC2025af_best_v1_3.art": 1}, + {"mcs.mu2e.FlatGammaMix1BBTriggerable.MDC2025af_best_v1_3.art": 1}, + {"mcs.mu2e.FlatGammaCaloMix1BBTriggerable.MDC2025af_best_v1_3.art": 1} ], "fcl_overrides": [ { @@ -20,7 +32,7 @@ "inloc": ["tape"], "outloc": [{"*.root": "tape"}], - "simjob_setup": ["/cvmfs/mu2e.opensciencegrid.org/Musings/AnalysisMDC2025/v01_00_00/setup.sh"] + "simjob_setup": ["/cvmfs/mu2e.opensciencegrid.org/Musings/AnalysisMDC2025/v01_02_00/setup.sh"] }, { "dsconf": ["MDC2025-000"], diff --git a/data/mdc2025/mix.json b/data/mdc2025/mix.json index 9dcb4f5..42ead37 100644 --- a/data/mdc2025/mix.json +++ b/data/mdc2025/mix.json @@ -59,8 +59,8 @@ }, { "input_data": [ - {"dts.mu2e.FlatGamma.MDC2025ac.art": 1}, - {"dts.mu2e.FlatGammaCalo.MDC2025ac.art": 1} + {"dts.mu2e.FlatGamma.MDC2025ag.art": 1}, + {"dts.mu2e.FlatGammaCalo.MDC2025ag.art": 1} ], "pileup_datasets": [{ "dts.mu2e.MuBeamFlashCat.MDC2025ac.art": 1, @@ -68,20 +68,22 @@ "dts.mu2e.NeutralsFlashCat.MDC2025ad.art": 1, "dts.mu2e.MuStopPileupCat.MDC2025ac.art": 2 }], - "dsconf": ["MDC2025af_best_v1_1"], + "dsconf": ["MDC2025ag_best_v1_3"], "mixconf": [2], "pbeam": ["Mix1BB"], "owner": ["mu2e"], - "simjob_setup": ["/cvmfs/mu2e.opensciencegrid.org/Musings/SimJob/MDC2025af/setup.sh"], + "simjob_setup": ["/cvmfs/mu2e.opensciencegrid.org/Musings/SimJob/MDC2025ag/setup.sh"], "fcl": ["Production/JobConfig/mixing/Mix.fcl"], "merge_events": [500], - "inloc": ["tape"], + "inloc": ["resilient"], "outloc": [{"dig.mu2e.*.art": "tape"}], "fcl_overrides": [ { "services.DbService.purpose": "Sim_best", - "services.DbService.version": "v1_1", - "#include": "Production/JobConfig/mixing/NoPrimary.fcl" + "services.DbService.version": "v1_3", + "#include": "Production/JobConfig/mixing/NoPrimary.fcl", + "outputs.Output.fileName": "dig.mu2e.{desc}.{dsconf}.sequence.art" + } ] } diff --git a/data/mdc2025/primary_muon.json b/data/mdc2025/primary_muon.json index 3513dd7..17c4f8c 100644 --- a/data/mdc2025/primary_muon.json +++ b/data/mdc2025/primary_muon.json @@ -279,18 +279,18 @@ }, { "desc": "RMCFlatGamma", - "dsconf": "MDC2025ac", + "dsconf": "MDC2025ag", "fcl": "Production/JobConfig/primary/FlatGamma.fcl", "fcl_overrides": { "services.GeometryService.bFieldFile": "Offline/Mu2eG4/geom/bfgeom_no_tsu_ps_v01.txt", "services.GeometryService.inputFile": "Offline/Mu2eG4/geom/geom_run1_a.txt" }, "resampler_name": "TargetStopResampler", - "input_data": "sim.mu2e.MuminusStopsCat.MDC2025ac.art", + "input_data": {"sim.mu2e.MuminusStopsCat.MDC2025ac.art": 1}, "njobs": 500, "events": 2000000, "run": 1430, - "simjob_setup": "/cvmfs/mu2e.opensciencegrid.org/Musings/SimJob/MDC2025ac/setup.sh", + "simjob_setup": "/cvmfs/mu2e.opensciencegrid.org/Musings/SimJob/MDC2025ag/setup.sh", "owner": "mu2e", "inloc": "disk", "outloc": { @@ -300,18 +300,18 @@ }, { "desc": "RMCFlatGammaCalo", - "dsconf": "MDC2025ac", + "dsconf": "MDC2025ag", "fcl": "Production/JobConfig/primary/FlatGammaCalo.fcl", "fcl_overrides": { "services.GeometryService.bFieldFile": "Offline/Mu2eG4/geom/bfgeom_no_tsu_ps_v01.txt", "services.GeometryService.inputFile": "Offline/Mu2eG4/geom/geom_run1_a.txt" }, "resampler_name": "TargetStopResampler", - "input_data": "sim.mu2e.MuminusStopsCat.MDC2025ac.art", + "input_data": {"sim.mu2e.MuminusStopsCat.MDC2025ac.art": 1}, "njobs": 2000, "events": 7500, "run": 1430, - "simjob_setup": "/cvmfs/mu2e.opensciencegrid.org/Musings/SimJob/MDC2025ac/setup.sh", + "simjob_setup": "/cvmfs/mu2e.opensciencegrid.org/Musings/SimJob/MDC2025ag/setup.sh", "owner": "mu2e", "inloc": "disk", "outloc": { @@ -328,7 +328,7 @@ "services.GeometryService.inputFile": "Offline/Mu2eG4/geom/geom_run1_a.txt" }, "resampler_name": "TargetStopResampler", - "input_data": "sim.mu2e.MuminusStopsCat.MDC2025ac.art", + "input_data": {"sim.mu2e.MuminusStopsCat.MDC2025ac.art": 1}, "njobs": 500, "events": 20000, "run": 1430, diff --git a/data/mdc2025/reco.json b/data/mdc2025/reco.json index deff093..b487206 100644 --- a/data/mdc2025/reco.json +++ b/data/mdc2025/reco.json @@ -1,6 +1,7 @@ [ { "dsconf": ["MDC2025af_best_v1_3"], + "tarball_append": "-reco", "fcl": ["Production/JobConfig/recoMC/OnSpill.fcl"], "input_data": [ {"dig.mu2e.CeEndpointOnSpillTriggered.MDC2025ad_best_v1_3.art": 1}, @@ -12,7 +13,8 @@ {"dig.mu2e.CePlusEndpointMix1BBTriggered.MDC2025af_best_v1_1.art": 1}, {"dig.mu2e.CosmicSignalMix1BBTriggered.MDC2025af_best_v1_1.art": 1}, {"dig.mu2e.FlatGammaCaloMix1BBTriggerable.MDC2025af_best_v1_1.art": 1}, - {"dig.mu2e.FlatGammaMix1BBTriggerable.MDC2025af_best_v1_1.art": 1} + {"dig.mu2e.FlatGammaMix1BBTriggerable.MDC2025af_best_v1_1.art": 1}, + {"dig.mu2e.NoPrimaryMix1BBTriggerable.MDC2025af_best_v1_1.art": 1} ], "fcl_overrides": [{"outputs.LoopHelixOutput.fileName": "mcs.owner.{desc}.version.sequencer.art"}], "inloc": ["tape"], @@ -30,5 +32,52 @@ "inloc": "tape", "outloc": {"*.art": "tape"}, "simjob_setup": "/cvmfs/mu2e.opensciencegrid.org/Musings/SimJob/MDC2025af/setup.sh" - } + }, + { + "dsconf": ["MDC2025af_best_v1_3"], + "tarball_append": "-reco", + "fcl": ["Production/JobConfig/recoMC/OnSpill.fcl"], + "input_data": [ + {"dig.mu2e.CeEndpointOnSpillTriggered.MDC2025ad_best_v1_3.art": 1}, + {"dig.mu2e.CeMLeadingLogOnSpillTriggered.MDC2025ad_best_v1_3.art": 1}, + {"dig.mu2e.CePLeadingLogOnSpillTriggered.MDC2025ad_best_v1_3.art": 1}, + {"dig.mu2e.CePlusEndpointOnSpillTriggered.MDC2025ad_best_v1_3.art": 1}, + {"dig.mu2e.CosmicSignalOnSpillTriggered.MDC2025ad_best_v1_3.art": 1}, + {"dig.mu2e.CeEndpointMix1BBTriggered.MDC2025af_best_v1_1.art": 1}, + {"dig.mu2e.CePlusEndpointMix1BBTriggered.MDC2025af_best_v1_1.art": 1}, + {"dig.mu2e.CosmicSignalMix1BBTriggered.MDC2025af_best_v1_1.art": 1}, + {"dig.mu2e.FlatGammaCaloMix1BBTriggerable.MDC2025af_best_v1_1.art": 1}, + {"dig.mu2e.FlatGammaMix1BBTriggerable.MDC2025af_best_v1_1.art": 1}, + {"dig.mu2e.NoPrimaryMix1BBTriggerable.MDC2025af_best_v1_1.art": 1} + ], + "fcl_overrides": [{"outputs.LoopHelixOutput.fileName": "mcs.owner.{desc}.version.sequencer.art"}], + "inloc": ["tape"], + "outloc": [{"*.art": "disk"}], + "simjob_setup": ["/cvmfs/mu2e.opensciencegrid.org/Musings/SimJob/MDC2025af/setup.sh"] + }, + { + "dsconf": "MDC2025af_best_v1_3", + "desc": "CosmicCRYExtractedTriggeredReco", + "fcl": "Production/JobConfig/recoMC/Extracted.fcl", + "input_data": + {"dig.mu2e.CosmicCRYExtractedTriggered.MDC2025ae_best_v1_3.art": 1}, + "fcl_overrides": + {"outputs.KinematicLineOutput.fileName": "mcs.owner.CosmicCRYExtractedTriggered.version.sequencer.art"}, + "inloc": "tape", + "outloc": {"*.art": "tape"}, + "simjob_setup": "/cvmfs/mu2e.opensciencegrid.org/Musings/SimJob/MDC2025af/setup.sh" + }, + { + "dsconf": ["MDC2025ag_best_v1_3"], + "tarball_append": "-reco", + "fcl": ["Production/JobConfig/recoMC/OnSpill.fcl"], + "input_data": [ + {"dig.mu2e.FlatGammaMix1BB.MDC2025ag_best_v1_3.art": 1}, + {"dig.mu2e.FlatGammaCaloMix1BB.MDC2025ag_best_v1_3.art": 1} + ], + "fcl_overrides": [{"outputs.LoopHelixOutput.fileName": "mcs.owner.{desc}.version.sequencer.art"}], + "inloc": ["tape"], + "outloc": [{"*.art": "disk"}], + "simjob_setup": ["/cvmfs/mu2e.opensciencegrid.org/Musings/SimJob/MDC2025ag/setup.sh"] + } ] \ No newline at end of file diff --git a/utils/db_analyzer.py b/utils/db_analyzer.py index d883a05..a55361f 100644 --- a/utils/db_analyzer.py +++ b/utils/db_analyzer.py @@ -1,5 +1,5 @@ #!/usr/bin/env python3 -"""Read-only analysis utilities over the prodtools SQLite database.""" +"""Analysis utilities over the prodtools SQLite database.""" import os import sys @@ -117,6 +117,8 @@ def list_jobs( complete_only: bool = False, incomplete_only: bool = False, datasets_only: bool = False, + since=None, + needs_processing: bool = False, ) -> None: jobs = _collect_jobs(session, pattern) @@ -129,6 +131,34 @@ def list_jobs( or (job.source_file and campaign in job.source_file) ] + if since is not None: + # Keep job if at least one output dataset was created after `since` + def _job_has_recent_output(job): + for output in job.outputs: + if not output.dataset: + continue + info = session.query(DatasetInfo).filter_by(dataset_name=output.dataset).one_or_none() + if info and info.creation_date and info.creation_date >= since: + return True + return False + jobs = [job for job in jobs if _job_has_recent_output(job)] + + if needs_processing: + # Keep jobs that have at least one complete output with no children + # and that is not a terminal product (nts.*) + def _has_unprocessed_output(job): + njobs = job.njobs or 0 + for output in job.outputs: + if not output.dataset: + continue + if output.dataset.startswith('nts.'): + continue + info = session.query(DatasetInfo).filter_by(dataset_name=output.dataset).one_or_none() + if info and info.nfiles and info.nfiles >= njobs and not info.has_children and not info.ignored: + return True + return False + jobs = [job for job in jobs if _has_unprocessed_output(job)] + if sort_by == "njobs": jobs.sort(key=lambda j: j.njobs or 0, reverse=True) elif sort_by == "tarball": @@ -177,7 +207,24 @@ def list_jobs( print(f"{job.njobs or 0:>8} {'':>10} {'':>14} {'':>6} {display_name:<80}") for dataset_name, nfiles, nevts, total_size, location in outputs: avg_size_mb = (total_size / nfiles / 1e6) if nfiles else 0 - color = '\033[92m' if nfiles >= (job.njobs or 0) else '\033[91m' + is_complete_out = nfiles >= (job.njobs or 0) + info = info_map.get(dataset_name) + is_ignored = info is not None and info.ignored + is_unprocessed = ( + is_complete_out + and not dataset_name.startswith('nts.') + and info is not None + and not info.has_children + and not is_ignored + ) + if is_ignored: + color = '\033[90m' # dark grey = ignored + elif is_unprocessed: + color = '\033[93m' # yellow = complete but no children + elif is_complete_out: + color = '\033[92m' # green = complete + else: + color = '\033[91m' # red = incomplete reset = '\033[0m' padded_dataset = f" {dataset_name}" print( @@ -190,3 +237,42 @@ def list_jobs( print(f"{job.njobs or 0:>8} {job.inloc or 'N/A':<8} {first_location:<8} {source_file:<25} {display_name:<80}") +def ignore_dataset(session, dataset_name: str, reason: str = None) -> bool: + """Mark a dataset as ignored for needs-processing checks. + + Creates a DatasetInfo stub if the dataset is not yet in the DB. + Returns True if the dataset was found/created, False on error. + """ + info = session.query(DatasetInfo).filter_by(dataset_name=dataset_name).one_or_none() + if info is None: + info = DatasetInfo(dataset_name=dataset_name, nfiles=0, nevts=0, total_size=0) + session.add(info) + info.ignored = True + if reason: + info.ignore_reason = reason + session.commit() + return True + + +def unignore_dataset(session, dataset_name: str) -> bool: + """Remove the ignored flag from a dataset. Returns False if not found.""" + info = session.query(DatasetInfo).filter_by(dataset_name=dataset_name).one_or_none() + if info is None: + return False + info.ignored = False + info.ignore_reason = None + session.commit() + return True + + +def list_ignored(session) -> None: + """Print all datasets currently marked as ignored.""" + rows = session.query(DatasetInfo).filter_by(ignored=True).order_by(DatasetInfo.dataset_name).all() + if not rows: + print("No datasets are currently ignored.") + return + print(f"{'DATASET':<80} {'REASON'}") + print(f"{'-'*80} {'------'}") + for row in rows: + print(f"{row.dataset_name:<80} {row.ignore_reason or ''}") + diff --git a/utils/db_builder.py b/utils/db_builder.py index 531c528..10834f5 100644 --- a/utils/db_builder.py +++ b/utils/db_builder.py @@ -167,18 +167,27 @@ def _is_output_complete(session, output, njobs): return info and info.nfiles and info.nfiles >= njobs -def build_db(pattern: str, db_path: str, poms_dir: str = "/exp/mu2e/app/users/mu2epro/production_manager/poms_map", limit: int = None) -> None: +def build_db(pattern: str, db_path: str, poms_dir: str = "/exp/mu2e/app/users/mu2epro/production_manager/poms_map", limit: int = None, since=None) -> None: """Create and populate the SQLite DB from POMS JSONs matching pattern. - Creates DB and tables if missing - Updates existing jobs or creates new ones from JSON files - Preserves existing metrics (avg_real_h, avg_vmhwm_gb) when updating - Resolves template-mode njobs via `defname:` when needed + - If `since` (datetime) is given, only re-processes JSON files modified after that cutoff """ session = get_db_session(db_path) - json_files = sorted(glob.glob(f"{poms_dir}/{pattern}.json")) - print(f"Loading {len(json_files)} JSON files...") + all_json_files = sorted(glob.glob(f"{poms_dir}/{pattern}.json")) + if since is not None: + import time + cutoff = since.timestamp() + json_files = [f for f in all_json_files if os.path.getmtime(f) >= cutoff] + print(f"Loading {len(json_files)} JSON files modified since {since.strftime('%Y-%m-%d')} " + f"(skipping {len(all_json_files) - len(json_files)} unchanged)...") + else: + json_files = all_json_files + print(f"Loading {len(json_files)} JSON files...") # Track tarballs we see in JSON files (to remove jobs that no longer exist) seen_tarballs = set() @@ -252,7 +261,9 @@ def build_db(pattern: str, db_path: str, poms_dir: str = "/exp/mu2e/app/users/mu count += 1 # Remove jobs that are no longer in JSON files (cascade deletes JobOutputs automatically) - if seen_tarballs: + # When using --since, only remove jobs from the files we actually processed + # (don't touch jobs from files we skipped) + if seen_tarballs and since is None: removed = session.query(Job).filter(~Job.tarball.in_(seen_tarballs)).delete(synchronize_session=False) if removed > 0: print(f"Removed {removed} jobs no longer in JSON files") diff --git a/utils/fcldump.py b/utils/fcldump.py index b783ee9..03527bb 100755 --- a/utils/fcldump.py +++ b/utils/fcldump.py @@ -46,16 +46,25 @@ def find_matching_jobdef(jobdefs, desc, input_type=None): Path to matching tarball, or None if no match found """ matches = [] - + for jobdef in jobdefs: - # Locate the tarball first - tarball_path = locate_tarball(jobdef) - + # Quick name-based pre-filter: jobdef description field (parts[2]) must match desc + jobdef_parts = jobdef.split('.') + if len(jobdef_parts) >= 3 and jobdef_parts[2] != desc: + continue + + # Locate the tarball, skip if not available + try: + tarball_path = locate_tarball(jobdef) + except RuntimeError as e: + print(f"Skipping {jobdef}: {e}") + continue + # Use Mu2eJobIO class to get output files from utils.jobiodetail import Mu2eJobIO job_io = Mu2eJobIO(tarball_path) outputs = job_io.job_outputs(0) - + # Check for exact match: desc should be the third field in output filename for output_file in outputs.values(): output_parts = output_file.split('.') diff --git a/utils/json2jobdef.py b/utils/json2jobdef.py index 4b4da7b..a73390e 100755 --- a/utils/json2jobdef.py +++ b/utils/json2jobdef.py @@ -622,8 +622,8 @@ def find_json_entry(configs, desc=None, dsconf=None, index=None): def process_all_for_dsconf(expanded_configs, dsconf, args): """Process all entries matching the specified dsconf and generate job definitions for all permutations""" - # Filter to only entries matching the specified dsconf (partial match) - matching_configs = [config for config in expanded_configs if config.get('dsconf', '').startswith(dsconf)] + # Filter to only entries matching the specified dsconf (exact match) + matching_configs = [config for config in expanded_configs if config.get('dsconf', '') == dsconf] if not matching_configs: sys.exit(f"No entries found matching dsconf: {dsconf}") diff --git a/utils/pomsMonitor.py b/utils/pomsMonitor.py index 2401967..11f044c 100755 --- a/utils/pomsMonitor.py +++ b/utils/pomsMonitor.py @@ -4,15 +4,37 @@ import os import sys import argparse +from datetime import datetime, timedelta # Add parent directory to path for imports sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) from utils.db_builder import build_db -from utils.db_analyzer import list_jobs, get_default_db_path +from utils.db_analyzer import list_jobs, get_default_db_path, ignore_dataset, unignore_dataset, list_ignored from utils.poms_db import get_db_session +def _parse_since(since_str): + """Parse --since argument into a datetime cutoff. + + Accepts: + - Nd (e.g. 7d) → N days ago + - Nw (e.g. 1w) → N weeks ago + - YYYY-MM-DD → specific date + """ + s = since_str.strip() + if s.endswith('d') and s[:-1].isdigit(): + return datetime.utcnow() - timedelta(days=int(s[:-1])) + if s.endswith('w') and s[:-1].isdigit(): + return datetime.utcnow() - timedelta(weeks=int(s[:-1])) + try: + return datetime.strptime(s, '%Y-%m-%d') + except ValueError: + raise argparse.ArgumentTypeError( + f"Invalid --since value '{since_str}'. Use Nd, Nw, or YYYY-MM-DD." + ) + + def main(): parser = argparse.ArgumentParser(description="Analyze POMS jobdesc JSON files") parser.add_argument('--pattern', default='MDC202*', help='POMS JSON file pattern (default: MDC202*)') @@ -25,15 +47,51 @@ def main(): parser.add_argument('--complete', action='store_true', help='Show only complete datasets (requires --outputs)') parser.add_argument('--incomplete', action='store_true', help='Show only incomplete datasets (requires --outputs)') parser.add_argument('--datasets-only', action='store_true', help='Print only dataset names (implies --outputs)') + parser.add_argument('--since', metavar='DURATION', + help='Show only datasets created after this cutoff. ' + 'Examples: 7d (7 days), 1w (1 week), 2026-03-10') + parser.add_argument('--needs-processing', action='store_true', + help='Show only complete datasets that have no downstream children ' + '(i.e., ready for the next production step). ' + 'Highlights them in yellow when used with --outputs.') + parser.add_argument('--ignore', metavar='DATASET', + help='Mark a dataset as ignored for --needs-processing checks') + parser.add_argument('--ignore-reason', metavar='REASON', + help='Optional reason to record when using --ignore') + parser.add_argument('--unignore', metavar='DATASET', + help='Remove the ignored flag from a dataset') + parser.add_argument('--list-ignored', action='store_true', + help='List all datasets currently marked as ignored') args = parser.parse_args() - + + since_dt = None + if args.since: + since_dt = _parse_since(args.since) + + session = get_db_session(args.db) + + # Handle ignore management commands (these exit early) + if args.ignore: + if ignore_dataset(session, args.ignore, reason=args.ignore_reason): + print(f"Ignored: {args.ignore}") + return + + if args.unignore: + if unignore_dataset(session, args.unignore): + print(f"Unignored: {args.unignore}") + else: + print(f"Dataset not found in DB: {args.unignore}") + return + + if args.list_ignored: + list_ignored(session) + return + if args.datasets_only: args.outputs = True if args.build_db: - build_db(args.pattern, args.db) - - session = get_db_session(args.db) + build_db(args.pattern, args.db, since=since_dt) show_outputs = ( args.outputs @@ -52,6 +110,8 @@ def main(): complete_only=args.complete, incomplete_only=args.incomplete, datasets_only=args.datasets_only, + since=since_dt, + needs_processing=args.needs_processing, ) diff --git a/utils/poms_db.py b/utils/poms_db.py index 96adc49..227b365 100644 --- a/utils/poms_db.py +++ b/utils/poms_db.py @@ -67,6 +67,8 @@ class DatasetInfo(Base): location = Column(String) has_children = Column(Boolean, default=False) # True if any file in dataset has child files creation_date = Column(DateTime) # Creation date from SAM definition + ignored = Column(Boolean, default=False) # True if dataset should be excluded from needs-processing + ignore_reason = Column(String) # Optional note explaining why it's ignored # Performance metrics (averages from job logs) avg_real_h = Column(Float) # Average wall time in hours avg_vmhwm_gb = Column(Float) # Average high-water-mark memory in GB @@ -97,6 +99,10 @@ def get_db_session(db_path=None): columns = [row[1] for row in result] if 'location' not in columns: conn.exec_driver_sql("ALTER TABLE dataset_info ADD COLUMN location TEXT") + if 'ignored' not in columns: + conn.exec_driver_sql("ALTER TABLE dataset_info ADD COLUMN ignored INTEGER DEFAULT 0") + if 'ignore_reason' not in columns: + conn.exec_driver_sql("ALTER TABLE dataset_info ADD COLUMN ignore_reason TEXT") except Exception: pass Session = sessionmaker(bind=engine)