From ca6b9ae972d38e2f457863ba02971a910b599650 Mon Sep 17 00:00:00 2001 From: Chris Jones Date: Tue, 7 Dec 2021 12:25:57 -0600 Subject: [PATCH] Added FinalPath OutputModules on FinalPaths do not cause unscheduled execution. --- FWCore/Framework/src/ProductResolvers.cc | 10 +- FWCore/Integration/test/BuildFile.xml | 1 + FWCore/Integration/test/test_finalpath.sh | 79 +++++++++++++ FWCore/Integration/test/test_finalpath_cfg.py | 69 ++++++++++++ FWCore/ParameterSet/python/Config.py | 105 +++++++++++++++++- FWCore/ParameterSet/python/SequenceTypes.py | 10 +- .../ParameterSet/python/SequenceVisitors.py | 30 +++++ 7 files changed, 290 insertions(+), 14 deletions(-) create mode 100755 FWCore/Integration/test/test_finalpath.sh create mode 100644 FWCore/Integration/test/test_finalpath_cfg.py diff --git a/FWCore/Framework/src/ProductResolvers.cc b/FWCore/Framework/src/ProductResolvers.cc index 23b2e65c602a7..25de273846e77 100644 --- a/FWCore/Framework/src/ProductResolvers.cc +++ b/FWCore/Framework/src/ProductResolvers.cc @@ -436,15 +436,7 @@ namespace edm { SharedResourcesAcquirer*, ModuleCallingContext const*) const { if (!skipCurrentProcess and worker_) { - return resolveProductImpl([this]() { - edm::Exception ex(errors::UnimplementedFeature); - ex << "Attempting to run unscheduled module without doing prefetching"; - std::ostringstream ost; - ost << "Calling produce method for unscheduled module " << worker_->description()->moduleName() << "/'" - << worker_->description()->moduleLabel() << "'"; - ex.addContext(ost.str()); - throw ex; - }); + return resolveProductImpl([] {}); } return Resolution(nullptr); } diff --git a/FWCore/Integration/test/BuildFile.xml b/FWCore/Integration/test/BuildFile.xml index b1082508dbdda..8c0ab8259f948 100644 --- a/FWCore/Integration/test/BuildFile.xml +++ b/FWCore/Integration/test/BuildFile.xml @@ -529,5 +529,6 @@ + diff --git a/FWCore/Integration/test/test_finalpath.sh b/FWCore/Integration/test/test_finalpath.sh new file mode 100755 index 0000000000000..b699ddf116360 --- /dev/null +++ b/FWCore/Integration/test/test_finalpath.sh @@ -0,0 +1,79 @@ +#!/bin/bash + +LOCAL_TEST_DIR=${CMSSW_BASE}/src/FWCore/Integration/test +LOCAL_TMP_DIR=${CMSSW_BASE}/tmp/${SCRAM_ARCH} + +# Pass in name and status +function die { echo $1: status $2 ; echo === Log file === ; cat ${3:-/dev/null} ; echo === End log file === ; exit $2; } + +pushd ${LOCAL_TMP_DIR} + +cat < finalpath_expected_empty.log +EOF + +cmsRun ${LOCAL_TEST_DIR}/test_finalpath_cfg.py >& finalpath.log || die "failed test_finalpath_cfg.py" $? +grep "thing '.*' TEST" finalpath.log | diff finalpath_expected_empty.log - || die "differences for test_finalpath_cfg.py" $? + + +cmsRun ${LOCAL_TEST_DIR}/test_finalpath_cfg.py -- --schedule >& finalpath.log || die "failed test_finalpath_cfg.py --schedule" $? +grep "thing '.*' TEST" finalpath.log | diff finalpath_expected_empty.log - || die "differences for test_finalpath_cfg.py" $? + + +cat < finalpath_expected_not_found.log +did not find thing '' TEST +did not find thing '' TEST +did not find thing '' TEST +found thing 'beginLumi' TEST +found thing 'endLumi' TEST +found thing 'beginRun' TEST +found thing 'endRun' TEST +EOF +cmsRun ${LOCAL_TEST_DIR}/test_finalpath_cfg.py -- --schedule --task >& finalpath.log || die "failed test_finalpath_cfg.py --schedule --task" $? +grep "thing '.*' TEST" finalpath.log | diff finalpath_expected_not_found.log - || die "differences for test_finalpath_cfg.py --schedule --task" $? + + +cmsRun ${LOCAL_TEST_DIR}/test_finalpath_cfg.py -- --endpath >& finalpath.log || die "failed test_finalpath_cfg.py --endpath" $? +grep "thing '.*' TEST" finalpath.log | diff finalpath_expected_empty.log - || die "differences for test_finalpath_cfg.py --endpath" $? + +cmsRun ${LOCAL_TEST_DIR}/test_finalpath_cfg.py -- --endpath >& finalpath.log || die "failed test_finalpath_cfg.py --schedule --endpath" $? +grep "thing '.*' TEST" finalpath.log | diff finalpath_expected_empty.log - || die "differences for test_finalpath_cfg.py --schedule --endpath" $? + + +cat < finalpath_expected_found.log +found thing '' TEST +found thing '' TEST +found thing '' TEST +found thing 'beginLumi' TEST +found thing 'endLumi' TEST +found thing 'beginRun' TEST +found thing 'endRun' TEST +EOF +cmsRun ${LOCAL_TEST_DIR}/test_finalpath_cfg.py -- --endpath --task >& finalpath.log || die "failed test_finalpath_cfg.py --endpath --task" $? +grep "thing '.*' TEST" finalpath.log | diff finalpath_expected_found.log - || die "differences for test_finalpath_cfg.py --endpath --task" $? + +cmsRun ${LOCAL_TEST_DIR}/test_finalpath_cfg.py -- --endpath --task >& finalpath.log || die "failed test_finalpath_cfg.py --endpath --task --schedule" $? +grep "thing '.*' TEST" finalpath.log | diff finalpath_expected_found.log - || die "differences for test_finalpath_cfg.py --endpath --task --schedule" $? + +cmsRun ${LOCAL_TEST_DIR}/test_finalpath_cfg.py -- --path --task >& finalpath.log || die "failed test_finalpath_cfg.py --path --task" $? +grep "thing '.*' TEST" finalpath.log | diff finalpath_expected_found.log - || die "differences for test_finalpath_cfg.py --path --task" $? + +cmsRun ${LOCAL_TEST_DIR}/test_finalpath_cfg.py -- --path --task >& finalpath.log || die "failed test_finalpath_cfg.py --path --task --schedule" $? +grep "thing '.*' TEST" finalpath.log | diff finalpath_expected_found.log - || die "differences for test_finalpath_cfg.py --path --task --schedule" $? + + +cat < finalpath_expected_filter.log +did not find thing '' TEST +found thing '' TEST +did not find thing '' TEST +found thing 'beginLumi' TEST +found thing 'endLumi' TEST +found thing 'beginRun' TEST +found thing 'endRun' TEST +EOF + +cmsRun ${LOCAL_TEST_DIR}/test_finalpath_cfg.py -- --path --task >& finalpath.log || die "failed test_finalpath_cfg.py --path --filter" $? +grep "thing '.*' TEST" finalpath.log | diff finalpath_expected_found.log - || die "differences for test_finalpath_cfg.py --path --filter" $? + + +cmsRun ${LOCAL_TEST_DIR}/test_finalpath_cfg.py -- --path --task >& finalpath.log || die "failed test_finalpath_cfg.py --path --filter --task" $? +grep "thing '.*' TEST" finalpath.log | diff finalpath_expected_found.log - || die "differences for test_finalpath_cfg.py --path --filter --task" $? diff --git a/FWCore/Integration/test/test_finalpath_cfg.py b/FWCore/Integration/test/test_finalpath_cfg.py new file mode 100644 index 0000000000000..d427dc9f5a0b9 --- /dev/null +++ b/FWCore/Integration/test/test_finalpath_cfg.py @@ -0,0 +1,69 @@ +import FWCore.ParameterSet.Config as cms +import argparse +import sys + +parser = argparse.ArgumentParser(prog=sys.argv[0], description='Test FinalPath.') + +parser.add_argument("--schedule", help="use cms.Schedule", action="store_true") +parser.add_argument("--task", help="put EDProducer into a task", action="store_true") +parser.add_argument("--path", help="put a consumer of the product onto a Path", action="store_true") +parser.add_argument("--endpath", help="put a consumer of the product onto an EndPath", action="store_true") +parser.add_argument("--filter", action="store_true") +parser.add_argument("--tracer", help="add Tracer service", action="store_true") + +print(sys.argv) +argv = sys.argv[:] +if '--' in argv: + argv.remove("--") +args, unknown = parser.parse_known_args(argv) + + +process = cms.Process("TEST") + +process.MessageLogger.cerr.INFO.limit = 10000 + +process.source = cms.Source("EmptySource") + +process.maxEvents.input = 3 + +process.thing = cms.EDProducer("ThingProducer") + +scheduledPaths =[] +if args.path: + print("adding Path") + process.otherThing = cms.EDProducer("OtherThingProducer", thingTag = cms.InputTag("thing")) + p = cms.Path() + if args.filter: + process.fltr = cms.EDFilter("Prescaler", prescaleFactor = cms.int32(2), prescaleOffset=cms.int32(0)) + p += process.fltr + if not args.task: + p += process.thing + p += process.otherThing + process.p = p + scheduledPaths.append(process.p) + if args.task: + process.p.associate(cms.Task(process.thing)) + +if args.endpath: + print("adding EndPath") + process.out2 = cms.OutputModule("AsciiOutputModule",outputCommands = cms.untracked.vstring("drop *", "keep *_thing_*_*")) + process.o = cms.EndPath(process.out2) + scheduledPaths.append(process.o) + if args.task: + process.o.associate(cms.Task(process.thing)) + +process.out = cms.OutputModule("GetProductCheckerOutputModule", verbose= cms.untracked.bool(True), outputCommands = cms.untracked.vstring("drop *", "keep *_thing_*_*")) +process.f = cms.FinalPath(process.out) + +if args.schedule: + print("adding Schedule") + scheduledPaths.append(process.f) + process.schedule = cms.Schedule(*scheduledPaths) + if args.task: + process.schedule.associate(cms.Task(process.thing)) + +if args.tracer: + process.add_(cms.Service("Tracer")) + +process.options.numberOfThreads=3 +process.options.numberOfStreams=1 diff --git a/FWCore/ParameterSet/python/Config.py b/FWCore/ParameterSet/python/Config.py index 9a4f3ad5d3a6a..e3ea506de2abd 100644 --- a/FWCore/ParameterSet/python/Config.py +++ b/FWCore/ParameterSet/python/Config.py @@ -17,7 +17,7 @@ from .Modules import _Module from .SequenceTypes import * from .SequenceTypes import _ModuleSequenceType, _Sequenceable #extend needs it -from .SequenceVisitors import PathValidator, EndPathValidator, ScheduleTaskValidator, NodeVisitor, CompositeVisitor, ModuleNamesFromGlobalsVisitor +from .SequenceVisitors import PathValidator, EndPathValidator, FinalPathValidator, ScheduleTaskValidator, NodeVisitor, CompositeVisitor, ModuleNamesFromGlobalsVisitor from .MessageLogger import MessageLogger from . import DictTypes @@ -120,6 +120,7 @@ def __init__(self,name,*Mods): self.__dict__['_Process__outputmodules'] = {} self.__dict__['_Process__paths'] = DictTypes.SortedKeysDict() # have to keep the order self.__dict__['_Process__endpaths'] = DictTypes.SortedKeysDict() # of definition + self.__dict__['_Process__finalpaths'] = DictTypes.SortedKeysDict() # of definition self.__dict__['_Process__sequences'] = {} self.__dict__['_Process__tasks'] = {} self.__dict__['_Process__services'] = {} @@ -289,6 +290,10 @@ def endpaths_(self): """returns a dict of the endpaths that have been added to the Process""" return DictTypes.SortedAndFixedKeysDict(self.__endpaths) endpaths = property(endpaths_,doc="dictionary containing the endpaths for the process") + def finalpaths_(self): + """returns a dict of the finalpaths that have been added to the Process""" + return DictTypes.SortedAndFixedKeysDict(self.__finalpaths) + finalpaths = property(finalpaths_,doc="dictionary containing the finalpaths for the process") def sequences_(self): """returns a dict of the sequences that have been added to the Process""" return DictTypes.FixedKeysDict(self.__sequences) @@ -477,6 +482,9 @@ def __setattr__(self,name,value): s = self.__findFirstUsingModule(self.endpaths,oldValue) if s is not None: raise ValueError(msg1+"endpath "+s.label_()+msg2) + s = self.__findFirstUsingModule(self.finalpaths,oldValue) + if s is not None: + raise ValueError(msg1+"finalpath "+s.label_()+msg2) # In case of EDAlias, raise Exception always to avoid surprises if isinstance(newValue, EDAlias): @@ -503,6 +511,9 @@ def __setattr__(self,name,value): s = self.__findFirstUsingModule(self.endpaths,oldValue) if s is not None: raise ValueError(msg1+"endpath "+s.label_()+msg2) + s = self.__findFirstUsingModule(self.finalpaths,oldValue) + if s is not None: + raise ValueError(msg1+"finalpath "+s.label_()+msg2) if not self.__InExtendCall and (Schedule._itemIsValid(newValue) or isinstance(newValue, Task)): self._replaceInScheduleDirectly(name, newValue) @@ -649,6 +660,13 @@ def _placeEndPath(self,name,mod): except ModuleCloneError as msg: context = format_outerframe(4) raise Exception("%sThe module %s in endpath %s is unknown to the process %s." %(context, msg, name, self._Process__name)) + def _placeFinalPath(self,name,mod): + self._validateSequence(mod, name) + try: + self._place(name, mod, self.__finalpaths) + except ModuleCloneError as msg: + context = format_outerframe(4) + raise Exception("%sThe module %s in finalpath %s is unknown to the process %s." %(context, msg, name, self._Process__name)) def _placeSequence(self,name,mod): self._validateSequence(mod, name) self._place(name, mod, self.__sequences) @@ -804,6 +822,9 @@ def dumpConfig(self, options=PrintOptions()): config+=self._dumpConfigNamedList(self.endpaths_().items(), 'endpath', options) + config+=self._dumpConfigNamedList(self.finalpaths_().items(), + 'finalpath', + options) config+=self._dumpConfigUnnamedList(self.services_().items(), 'service', options) @@ -989,6 +1010,7 @@ def dumpPython(self, options=PrintOptions()): result+=self._dumpPythonList(self._itemsInDependencyOrder(self.sequences), options) result+=self._dumpPythonList(self.paths_(), options) result+=self._dumpPythonList(self.endpaths_(), options) + result+=self._dumpPythonList(self.finalpaths_(), options) result+=self._dumpPythonList(self.aliases_(), options) if not self.schedule_() == None: result += 'process.schedule = ' + self.schedule.dumpPython(options) @@ -1035,6 +1057,7 @@ def splitPython(self, options = PrintOptions()): parts.update(self._splitPythonList('sequences', self._itemsInDependencyOrder(self.sequences), options)) parts.update(self._splitPythonList('paths', self.paths_(), options)) parts.update(self._splitPythonList('paths', self.endpaths_(), options)) + parts.update(self._splitPythonList('paths', self.finalaths_(), options)) parts.update(self._splitPythonList('modules', self.aliases_(), options)) if options.targetDirectory is not None: @@ -1083,6 +1106,8 @@ def _replaceInSequences(self, label, new): sequenceable.replace(old,new) for sequenceable in self.endpaths.values(): sequenceable.replace(old,new) + for sequenceable in self.finalpaths.values(): + sequenceable.replace(old,new) def _replaceInTasks(self, label, new): old = getattr(self,label) for task in self.tasks.values(): @@ -1148,6 +1173,7 @@ def _insertPaths(self, processPSet, nodeVisitor): scheduledPaths = [] triggerPaths = [] endpaths = [] + finalpaths = [] if self.schedule_() == None: # make one from triggerpaths & endpaths for name in self.paths_(): @@ -1156,19 +1182,49 @@ def _insertPaths(self, processPSet, nodeVisitor): for name in self.endpaths_(): scheduledPaths.append(name) endpaths.append(name) + for name in self.finalpaths_(): + finalpaths.append(name) else: for path in self.schedule_(): pathname = path.label_() - scheduledPaths.append(pathname) if pathname in self.endpaths_(): endpaths.append(pathname) + scheduledPaths.append(pathname) + elif pathname in self.finalpaths_(): + finalpaths.append(pathname) + if 1 == len(finalpaths): + scheduledPaths.append("@finalPath") else: + scheduledPaths.append(pathname) triggerPaths.append(pathname) for task in self.schedule_()._tasks: task.resolve(self.__dict__) scheduleTaskValidator = ScheduleTaskValidator() task.visit(scheduleTaskValidator) task.visit(nodeVisitor) + # consolidate all final_paths into one EndPath + endPathWithFinalPathModulesName ="@finalPath" + finalPathEndPath = EndPath() + if finalpaths: + endpaths.append(endPathWithFinalPathModulesName) + scheduledPaths.append(endPathWithFinalPathModulesName) + finalpathValidator = FinalPathValidator() + modulesOnFinalPath = [] + for finalpathname in finalpaths: + iFinalPath = self.finalpaths_()[finalpathname] + iFinalPath.resolve(self.__dict__) + finalpathValidator.setLabel(finalpathname) + iFinalPath.visit(finalpathValidator) + if finalpathValidator.filtersOnFinalpaths or finalpathValidator.producersOnFinalpaths: + names = [p.label_ for p in finalpathValidator.filtersOnFinalpaths] + names.extend( [p.label_ for p in finalpathValidator.producersOnFinalpaths]) + raise RuntimeError("FinalPath %s has non OutputModules %s" % (finalpathname, ",".join(names))) + modulesOnFinalPath.extend(iFinalPath.moduleNames()) + for m in modulesOnFinalPath: + mod = getattr(self, m) + setattr(mod, "@onFinalPath", untracked.bool(True)) + finalPathEndPath += mod + processPSet.addVString(True, "@end_paths", endpaths) processPSet.addVString(True, "@paths", scheduledPaths) # trigger_paths are a little different @@ -1190,19 +1246,25 @@ def _insertPaths(self, processPSet, nodeVisitor): iPath.visit(pathCompositeVisitor) iPath.insertInto(processPSet, triggername, decoratedList) for endpathname in endpaths: - iEndPath = self.endpaths_()[endpathname] + if endpathname is not endPathWithFinalPathModulesName: + iEndPath = self.endpaths_()[endpathname] + else: + iEndPath = finalPathEndPath iEndPath.resolve(self.__dict__) endpathValidator.setLabel(endpathname) lister.initialize() iEndPath.visit(endpathCompositeVisitor) iEndPath.insertInto(processPSet, endpathname, decoratedList) processPSet.addVString(False, "@filters_on_endpaths", endpathValidator.filtersOnEndpaths) + def resolve(self,keepUnresolvedSequencePlaceholders=False): for x in self.paths.values(): x.resolve(self.__dict__,keepUnresolvedSequencePlaceholders) for x in self.endpaths.values(): x.resolve(self.__dict__,keepUnresolvedSequencePlaceholders) + for x in self.finalpaths.values(): + x.resolve(self.__dict__,keepUnresolvedSequencePlaceholders) if not self.schedule_() == None: for task in self.schedule_()._tasks: task.resolve(self.__dict__,keepUnresolvedSequencePlaceholders) @@ -1231,6 +1293,7 @@ def prune(self,verbose=False,keepUnresolvedSequencePlaceholders=False): schedNames = set(( x.label_() for x in self.schedule_())) names = set(self.paths) names.update(set(self.endpaths)) + names.update(set(self.finalpaths)) unneededPaths = names - schedNames for n in unneededPaths: delattr(self,n) @@ -1241,6 +1304,7 @@ def prune(self,verbose=False,keepUnresolvedSequencePlaceholders=False): else: pths = list(self.paths.values()) pths.extend(self.endpaths.values()) + pths.extend(self.finalpaths.values()) temp = Schedule(*pths) usedModules=set(temp.moduleNames()) unneededModules = self._pruneModules(self.producers_(), usedModules) @@ -1256,6 +1320,9 @@ def prune(self,verbose=False,keepUnresolvedSequencePlaceholders=False): for p in self.endpaths.values(): p.visit(sv) p.visit(tv) + for p in self.finalpaths.values(): + p.visit(sv) + p.visit(tv) def removeUnneeded(seqOrTasks, allSequencesOrTasks): _keepSet = set(( s for s in seqOrTasks if s.hasLabel_())) _availableSet = set(allSequencesOrTasks.values()) @@ -1272,7 +1339,7 @@ def removeUnneeded(seqOrTasks, allSequencesOrTasks): print(" modules:"+",".join(unneededModules)) print(" tasks:"+",".join(unneededTaskLabels)) print(" sequences:"+",".join(unneededSeqLabels)) - print(" paths/endpaths:"+",".join(unneededPaths)) + print(" paths/endpaths/finalpaths:"+",".join(unneededPaths)) def _pruneModules(self, d, scheduledNames): moduleNames = set(d.keys()) junk = moduleNames - scheduledNames @@ -2635,6 +2702,36 @@ def testPath(self): t = Path(p.a, p.t1, Task(), p.t1) self.assertTrue(t.dumpPython(PrintOptions()) == 'cms.Path(process.a, cms.Task(), process.t1)\n') + def testFinalPath(self): + p = Process("test") + p.a = OutputModule("MyOutputModule") + p.b = OutputModule("YourOutputModule") + p.c = OutputModule("OurOutputModule") + path = FinalPath(p.a) + path *= p.b + path += p.c + self.assertEqual(str(path),'a+b+c') + path = FinalPath(p.a*p.b+p.c) + self.assertEqual(str(path),'a+b+c') + path = FinalPath(p.a+ p.b*p.c) + self.assertEqual(str(path),'a+b+c') + path = FinalPath(p.a*(p.b+p.c)) + self.assertEqual(str(path),'a+b+c') + p.es = ESProducer("AnESProducer") + self.assertRaises(TypeError,Path,p.es) + + t = FinalPath() + self.assertTrue(t.dumpPython(PrintOptions()) == 'cms.FinalPath()\n') + + t = FinalPath(p.a) + self.assertTrue(t.dumpPython(PrintOptions()) == 'cms.FinalPath(process.a)\n') + + self.assertRaises(TypeError, FinalPath, Task()) + self.assertRaises(TypeError, FinalPath, p.a, Task()) + + p.prod = EDProducer("prodName") + p.t1 = Task(p.prod) + self.assertRaises(TypeError, FinalPath, p.a, p.t1, Task(), p.t1) def testCloneSequence(self): p = Process("test") a = EDAnalyzer("MyAnalyzer") diff --git a/FWCore/ParameterSet/python/SequenceTypes.py b/FWCore/ParameterSet/python/SequenceTypes.py index 85fbbe26c9af2..2ef9e92cf6b7a 100644 --- a/FWCore/ParameterSet/python/SequenceTypes.py +++ b/FWCore/ParameterSet/python/SequenceTypes.py @@ -651,6 +651,14 @@ def __init__(self,*arg,**argv): def _placeImpl(self,name,proc): proc._placeEndPath(name,self) +class FinalPath(_ModuleSequenceType): + def __init__(self,*arg,**argv): + super(FinalPath,self).__init__(*arg,**argv) + def _placeImpl(self,name,proc): + proc._placeFinalPath(name,self) + def associate(self,task): + raise TypeError("FinalPath does not allow associations with Tasks") + class Sequence(_ModuleSequenceType,_Sequenceable): def __init__(self,*arg,**argv): super(Sequence,self).__init__(*arg,**argv) @@ -746,7 +754,7 @@ def associate(self,*tasks): self._tasks.add(task) @staticmethod def _itemIsValid(item): - return isinstance(item,Path) or isinstance(item,EndPath) + return isinstance(item,Path) or isinstance(item,EndPath) or isinstance(item,FinalPath) def copy(self): import copy aCopy = copy.copy(self) diff --git a/FWCore/ParameterSet/python/SequenceVisitors.py b/FWCore/ParameterSet/python/SequenceVisitors.py index 012c67f1a1c28..933345bc0ef96 100644 --- a/FWCore/ParameterSet/python/SequenceVisitors.py +++ b/FWCore/ParameterSet/python/SequenceVisitors.py @@ -67,6 +67,36 @@ def leave(self,visitee): if isinstance(visitee, Task): self._levelInTasks -= 1 +# Use this on EndPaths +class FinalPathValidator(object): + def __init__(self): + self.__label = '' + self._levelInTasks = 0 + self.filtersOnFinalpaths = [] + self.producersOnFinalpaths = [] + def setLabel(self,label): + self.__label = "'"+label+"' " + def enter(self,visitee): + if visitee.isLeaf(): + if isinstance(visitee, _Labelable): + if not visitee.hasLabel_(): + raise ValueError("FinalPath "+self.__label+"contains a module of type '"+visitee.type_()+"' which has\nno assigned label.") + elif isinstance(visitee, Service): + if not visitee._inProcess: + raise ValueError("FinalPath "+self.__label+"contains a service of type '"+visitee.type_()+"' which is not attached to the process.\n") + if isinstance(visitee, Task): + self._levelInTasks += 1 + if self._levelInTasks > 0: + return + if isinstance(visitee,EDFilter): + self.filtersOnFinalpaths.append(visitee.type_()) + if isinstance(visitee,EDProducer): + self.producersOnFinalpaths.append(visitee.type_()) + def leave(self,visitee): + if self._levelInTasks > 0: + if isinstance(visitee, Task): + self._levelInTasks -= 1 + class NodeVisitor(object): """Form sets of all modules, ESProducers, ESSources and Services in visited objects. Can be used to visit Paths, EndPaths, Sequences or Tasks. Includes in sets objects on sub-Sequences and sub-Tasks"""