From fa71013b47b9db47433c5ad67dab524b47a7ef71 Mon Sep 17 00:00:00 2001 From: Patrick Date: Wed, 2 Jul 2025 13:48:49 +0100 Subject: [PATCH 1/3] Add walker that passes thread_id to Threaded Visitor, and allow then submitters to have access to thread --- build-support/dist.sh | 3 +++ build.sbt | 4 +-- builtins/cuda.tape | 8 ++++++ src/main/scala/ducttape/cli/ExecuteMode.scala | 6 ++--- src/main/scala/ducttape/exec/Executor.scala | 8 +++--- src/main/scala/ducttape/exec/Submitter.scala | 5 ++-- .../ducttape/exec/UnpackedDagVisitor.scala | 4 +++ .../ducttape/hyperdag/walker/Walker.scala | 6 +++-- .../scala/ducttape/workflow/Visitors.scala | 19 +++++++++++++ .../walker/UnpackedDagWalkerTest.scala | 27 +++++++++++++++++++ version.info | 2 +- 11 files changed, 78 insertions(+), 14 deletions(-) create mode 100644 builtins/cuda.tape diff --git a/build-support/dist.sh b/build-support/dist.sh index c19205dc..71a9127a 100755 --- a/build-support/dist.sh +++ b/build-support/dist.sh @@ -26,6 +26,9 @@ rm -rf ${DIST} mkdir -p ${DIST} cp $DUCTTAPE/$TARGET_JAR ${DIST}/ducttape.jar +# add version.info to the JAR +zip -g ${DIST}/ducttape.jar version.info + fgrep -v DEV-ONLY $DUCTTAPE/ducttape > ${DIST}/ducttape chmod a+x ${DIST}/ducttape cp $DUCTTAPE/tabular ${DIST}/tabular diff --git a/build.sbt b/build.sbt index dfaa823b..ff5c7d9f 100644 --- a/build.sbt +++ b/build.sbt @@ -3,8 +3,8 @@ scalaVersion := "2.12.13"; libraryDependencies += "commons-io" % "commons-io" % "2.4"; libraryDependencies += "com.frugalmechanic" %% "scala-optparse" % "1.1.3"; libraryDependencies += "javax.servlet" % "javax.servlet-api" % "3.0.1" % "provided"; -libraryDependencies += "org.slf4j" % "slf4j-api" % "1.6.6"; -libraryDependencies += "org.slf4j" % "slf4j-simple" % "1.6.6"; +libraryDependencies += "org.slf4j" % "slf4j-api" % "1.7.36"; +libraryDependencies += "org.slf4j" % "slf4j-simple" % "1.7.36"; libraryDependencies += "org.clapper" %% "grizzled-slf4j" % "1.3.0"; libraryDependencies += "org.scala-lang.modules" %% "scala-parser-combinators" % "2.0.0"; libraryDependencies += "org.pegdown" % "pegdown" % "1.1.0"; diff --git a/builtins/cuda.tape b/builtins/cuda.tape new file mode 100644 index 00000000..11c9b149 --- /dev/null +++ b/builtins/cuda.tape @@ -0,0 +1,8 @@ +# a simple modification of the shell submitter +# that exports the CUDA_VISIBLE_DEVICES based on the THREAD_ID +submitter cuda_shell :: COMMANDS THREAD_ID { + action run { + export CUDA_VISIBLE_DEVICES=$THREAD_ID + eval "$COMMANDS" + } +} \ No newline at end of file diff --git a/src/main/scala/ducttape/cli/ExecuteMode.scala b/src/main/scala/ducttape/cli/ExecuteMode.scala index cfe57ad7..385418ff 100644 --- a/src/main/scala/ducttape/cli/ExecuteMode.scala +++ b/src/main/scala/ducttape/cli/ExecuteMode.scala @@ -148,9 +148,9 @@ object ExecuteMode { } } try { - Visitors.visitAll(workflow, - new Executor(dirs, packageVersions, planPolicy, locker, workflow, cc.completed, cc.todo, observers=Seq(failObserver)), - planPolicy, committedVersion, opts.jobs(), traversal) + Visitors.visitAllThreaded(workflow, + new Executor(dirs, packageVersions, planPolicy, locker, workflow, cc.completed, cc.todo, observers=Seq(failObserver)), + planPolicy, committedVersion, opts.jobs(), traversal) } catch { case t: Throwable => { System.err.println(s"${Config.errorColor}The following tasks failed:${Config.resetColor}") diff --git a/src/main/scala/ducttape/exec/Executor.scala b/src/main/scala/ducttape/exec/Executor.scala index 8198f832..89dc7b17 100644 --- a/src/main/scala/ducttape/exec/Executor.scala +++ b/src/main/scala/ducttape/exec/Executor.scala @@ -20,13 +20,13 @@ class Executor(val dirs: DirectoryArchitect, val workflow: HyperWorkflow, val alreadyDone: Set[(String,Realization)], val todo: Set[(String,Realization)], - observers: Seq[ExecutionObserver] = Nil) extends UnpackedDagVisitor with Logging { + observers: Seq[ExecutionObserver] = Nil) extends ThreadedDagVisitor with Logging { val submitter = new Submitter(workflow.submitters) observers.foreach(_.init(this)) - override def visit(task: VersionedTask) { + override def visit(task: VersionedTask, threadId: Int) { if (todo( (task.name, task.realization) )) { val taskEnv = new FullTaskEnvironment(dirs, packageVersioner, task) @@ -50,14 +50,14 @@ class Executor(val dirs: DirectoryArchitect, // while we were waiting on the lock if (!CompletionChecker.isComplete(taskEnv)) { - System.err.println(s"Running ${task} in ${taskEnv.where.getAbsolutePath}") + System.err.println(s"Running ${task} in ${taskEnv.where.getAbsolutePath} on thread ${threadId}") observers.foreach(_.begin(this, taskEnv)) Files.mkdirs(taskEnv.where) debug(s"Environment for ${task} is ${taskEnv.env}") // the "run" action of the submitter will throw if the exit code is non-zero - submitter.run(taskEnv) + submitter.run(taskEnv, threadId) def incompleteCallback(task: VersionedTask, msg: String) { System.err.println(s"${task}: ${msg}") diff --git a/src/main/scala/ducttape/exec/Submitter.scala b/src/main/scala/ducttape/exec/Submitter.scala index d845e008..d877269f 100644 --- a/src/main/scala/ducttape/exec/Submitter.scala +++ b/src/main/scala/ducttape/exec/Submitter.scala @@ -25,7 +25,7 @@ import ducttape.workflow.RealTask object Submitter { // some special variables are passed without user intervention - val SPECIAL_VARIABLES = Set("COMMANDS", "TASK_VARIABLES", "TASK", "REALIZATION", "CONFIGURATION") + val SPECIAL_VARIABLES = Set("COMMANDS", "TASK_VARIABLES", "TASK", "REALIZATION", "CONFIGURATION", "THREAD_ID") } class Submitter(submitters: Seq[SubmitterDef]) extends Logging { @@ -68,7 +68,7 @@ class Submitter(submitters: Seq[SubmitterDef]) extends Logging { } } - def run(taskEnv: FullTaskEnvironment) { + def run(taskEnv: FullTaskEnvironment, threadId: Int) { val submitterDef: SubmitterDef = getSubmitter(taskEnv.task) val requiredParams: Set[String] = submitterDef.params.map(_.name).toSet // only include the dot params from the task that are explicitly requested by the submitter @@ -87,6 +87,7 @@ class Submitter(submitters: Seq[SubmitterDef]) extends Logging { ("TASK", taskEnv.task.name), ("REALIZATION", taskEnv.task.realization.toString), ("TASK_VARIABLES", taskEnv.taskVariables), + ("THREAD_ID", threadId.toString), ("COMMANDS", taskEnv.task.commands.toString)) ++ dotParamsEnv ++ taskEnv.env diff --git a/src/main/scala/ducttape/exec/UnpackedDagVisitor.scala b/src/main/scala/ducttape/exec/UnpackedDagVisitor.scala index f6f064de..0ca2aa33 100644 --- a/src/main/scala/ducttape/exec/UnpackedDagVisitor.scala +++ b/src/main/scala/ducttape/exec/UnpackedDagVisitor.scala @@ -14,3 +14,7 @@ trait UnpackedRealDagVisitor { trait UnpackedDagVisitor { def visit(task: VersionedTask) } + +trait ThreadedDagVisitor { + def visit(task: VersionedTask, threadId: Int) +} diff --git a/src/main/scala/ducttape/hyperdag/walker/Walker.scala b/src/main/scala/ducttape/hyperdag/walker/Walker.scala index fc7cc9d5..6ce047a7 100644 --- a/src/main/scala/ducttape/hyperdag/walker/Walker.scala +++ b/src/main/scala/ducttape/hyperdag/walker/Walker.scala @@ -41,7 +41,7 @@ trait Walker[A] extends Iterable[A] with Logging { // TODO: Should this be a Tra // TODO: Add a .par(j) method that returns a parallel walker // j = numCores (as in make -j) - def foreach[U](j: Int, f: A => U) { + def foreach[U](j: Int, f: (A, Int) => U) { import java.util.concurrent._ import collection.JavaConversions._ @@ -70,7 +70,7 @@ trait Walker[A] extends Iterable[A] with Logging { // TODO: Should this be a Tra var success = true try { debug("Executing callback for %s".format(a)) - f(a) + f(a, i) } catch { // catch exceptions happening within the callback case t: Throwable => { @@ -104,4 +104,6 @@ trait Walker[A] extends Iterable[A] with Logging { // TODO: Should this be a Tra // call get on each future so that we propagate any exceptions futures.foreach(_.get) } + + def foreach[U](j: Int, f: A => U): Unit = foreach(j, (a: A, _: Int) => f(a)) } diff --git a/src/main/scala/ducttape/workflow/Visitors.scala b/src/main/scala/ducttape/workflow/Visitors.scala index 74c84f29..afd95085 100644 --- a/src/main/scala/ducttape/workflow/Visitors.scala +++ b/src/main/scala/ducttape/workflow/Visitors.scala @@ -5,6 +5,7 @@ package ducttape.workflow import collection._ import ducttape.exec.UnpackedRealDagVisitor import ducttape.exec.UnpackedDagVisitor +import ducttape.exec.ThreadedDagVisitor import ducttape.versioner.WorkflowVersionInfo import ducttape.workflow.Types.UnpackedWorkVert import ducttape.hyperdag.walker.Traversal @@ -47,4 +48,22 @@ object Visitors extends Logging { }) visitor } + + def visitAllThreaded[A <: ThreadedDagVisitor]( + workflow: HyperWorkflow, + visitor: A, + planPolicy: PlanPolicy, + workflowVersion: WorkflowVersionInfo, + numCores: Int = 1, + traversal: Traversal = Arbitrary): A = { + + debug(s"Visiting workflow using traversal: ${traversal}") + workflow.unpackedWalker(planPolicy, traversal=traversal).foreach(numCores, { (v: UnpackedWorkVert, threadId: Int) => + val taskT: TaskTemplate = v.packed.value.get + val task: VersionedTask = taskT.toRealTask(v).toVersionedTask(workflowVersion) + debug(s"Visiting ${task} on thread ${threadId}") + visitor.visit(task, threadId) + }) + visitor + } } diff --git a/src/test/scala/ducttape/hyperdag/walker/UnpackedDagWalkerTest.scala b/src/test/scala/ducttape/hyperdag/walker/UnpackedDagWalkerTest.scala index f61cea24..18a65943 100644 --- a/src/test/scala/ducttape/hyperdag/walker/UnpackedDagWalkerTest.scala +++ b/src/test/scala/ducttape/hyperdag/walker/UnpackedDagWalkerTest.scala @@ -154,4 +154,31 @@ class UnpackedDagWalkerTest extends FlatSpec with Logging{ } timer.interrupt() } + + it should "traverse a diamond with the threaded walker and pass threadId" in { + val numThreads = 2 + val vertsToThreads = new collection.mutable.HashMap[String, Int]() + val seenThreads = new collection.mutable.HashSet[Int]() + + diamond.unpackedWalker(traversal=Arbitrary).foreach(numThreads, { (v, threadId) => + Thread.sleep(10) // Add a small delay to ensure multiple threads are used + vertsToThreads.synchronized { + vertsToThreads(v.packed.value) = threadId + seenThreads += threadId + } + }) + + // Check that all vertices were visited + val expectedVerts = Set("Vertex A", "Vertex B", "Vertex C", "Vertex D") + assert(vertsToThreads.keySet == expectedVerts, s"Vertices visited: ${vertsToThreads.keySet}") + + // Check that threadIds are in the expected range + assert(seenThreads.subsetOf((0 until numThreads).toSet), s"Thread IDs used: $seenThreads") + + // Check that more than one thread was used + assert(seenThreads.size > 1, s"Expected more than one thread, got: $seenThreads") + + // TODO: Check thread mapping is expected + // Can I make any guarantees about the thread mapping? + } } diff --git a/version.info b/version.info index 79a2734b..5d4294b9 100644 --- a/version.info +++ b/version.info @@ -1 +1 @@ -0.5.0 \ No newline at end of file +0.5.1 \ No newline at end of file From 5dfc64cca2ae7a07cb9de08a02b389d0ea2e2af4 Mon Sep 17 00:00:00 2001 From: Patrick Date: Wed, 2 Jul 2025 13:58:18 +0100 Subject: [PATCH 2/3] Update CI --- .github/workflows/scala-ci.yaml | 20 +++++++------------- 1 file changed, 7 insertions(+), 13 deletions(-) diff --git a/.github/workflows/scala-ci.yaml b/.github/workflows/scala-ci.yaml index cfc1a32d..978ae2cd 100644 --- a/.github/workflows/scala-ci.yaml +++ b/.github/workflows/scala-ci.yaml @@ -12,11 +12,9 @@ jobs: steps: - uses: actions/checkout@v2 - - name: Set up JDK 11 - uses: actions/setup-java@v2 + - uses: olafurpg/setup-scala@v11 with: - java-version: '11' - distribution: 'adopt' + java-version: adopt@1.11 - name: Compile run: sbt compile @@ -26,12 +24,10 @@ jobs: steps: - uses: actions/checkout@v2 - - name: Set up JDK 11 - uses: actions/setup-java@v2 + - uses: olafurpg/setup-scala@v11 with: - java-version: '11' - distribution: 'adopt' - + java-version: adopt@1.11 + - name: Run Unit Tests run: sbt test @@ -41,11 +37,9 @@ jobs: steps: - uses: actions/checkout@v2 - - name: Set up JDK 11 - uses: actions/setup-java@v2 + - uses: olafurpg/setup-scala@v11 with: - java-version: '11' - distribution: 'adopt' + java-version: adopt@1.11 - name: Assemble and Distribute run: | From 1fc603c04839727e5cadb2edef03a2074d70e8b6 Mon Sep 17 00:00:00 2001 From: Patrick Date: Wed, 2 Jul 2025 14:17:59 +0100 Subject: [PATCH 3/3] fix broken example --- tutorial/02-03-git.tape | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tutorial/02-03-git.tape b/tutorial/02-03-git.tape index 6ae25bb8..652012a0 100644 --- a/tutorial/02-03-git.tape +++ b/tutorial/02-03-git.tape @@ -16,7 +16,7 @@ task lunchtime : lunchpy { } # * Build commands are only called when versioner indicates a new version -package lunchpy :: .versioner=git .repo="git://github.com/mjdenkowski/lunchpy.git" .ref=HEAD { +package lunchpy :: .versioner=git .repo="https://github.com/mjdenkowski/lunchpy.git" .ref=HEAD { # We don't actually need to compile anything for python code, # but for the sake of example we'll make this program run a bit faster python -m compileall .