Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 7 additions & 13 deletions .github/workflows/scala-ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,9 @@ jobs:

steps:
- uses: actions/checkout@v2
- name: Set up JDK 11
uses: actions/setup-java@v2
- uses: olafurpg/setup-scala@v11
with:
java-version: '11'
distribution: 'adopt'
java-version: adopt@1.11

- name: Compile
run: sbt compile
Expand All @@ -26,12 +24,10 @@ jobs:

steps:
- uses: actions/checkout@v2
- name: Set up JDK 11
uses: actions/setup-java@v2
- uses: olafurpg/setup-scala@v11
with:
java-version: '11'
distribution: 'adopt'

java-version: adopt@1.11

- name: Run Unit Tests
run: sbt test

Expand All @@ -41,11 +37,9 @@ jobs:

steps:
- uses: actions/checkout@v2
- name: Set up JDK 11
uses: actions/setup-java@v2
- uses: olafurpg/setup-scala@v11
with:
java-version: '11'
distribution: 'adopt'
java-version: adopt@1.11

- name: Assemble and Distribute
run: |
Expand Down
3 changes: 3 additions & 0 deletions build-support/dist.sh
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ rm -rf ${DIST}
mkdir -p ${DIST}
cp $DUCTTAPE/$TARGET_JAR ${DIST}/ducttape.jar

# add version.info to the JAR
zip -g ${DIST}/ducttape.jar version.info

fgrep -v DEV-ONLY $DUCTTAPE/ducttape > ${DIST}/ducttape
chmod a+x ${DIST}/ducttape
cp $DUCTTAPE/tabular ${DIST}/tabular
Expand Down
4 changes: 2 additions & 2 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ scalaVersion := "2.12.13";
libraryDependencies += "commons-io" % "commons-io" % "2.4";
libraryDependencies += "com.frugalmechanic" %% "scala-optparse" % "1.1.3";
libraryDependencies += "javax.servlet" % "javax.servlet-api" % "3.0.1" % "provided";
libraryDependencies += "org.slf4j" % "slf4j-api" % "1.6.6";
libraryDependencies += "org.slf4j" % "slf4j-simple" % "1.6.6";
libraryDependencies += "org.slf4j" % "slf4j-api" % "1.7.36";
libraryDependencies += "org.slf4j" % "slf4j-simple" % "1.7.36";
libraryDependencies += "org.clapper" %% "grizzled-slf4j" % "1.3.0";
libraryDependencies += "org.scala-lang.modules" %% "scala-parser-combinators" % "2.0.0";
libraryDependencies += "org.pegdown" % "pegdown" % "1.1.0";
Expand Down
8 changes: 8 additions & 0 deletions builtins/cuda.tape
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
# a simple modification of the shell submitter
# that exports the CUDA_VISIBLE_DEVICES based on the THREAD_ID
submitter cuda_shell :: COMMANDS THREAD_ID {
action run {
export CUDA_VISIBLE_DEVICES=$THREAD_ID
eval "$COMMANDS"
}
}
6 changes: 3 additions & 3 deletions src/main/scala/ducttape/cli/ExecuteMode.scala
Original file line number Diff line number Diff line change
Expand Up @@ -148,9 +148,9 @@ object ExecuteMode {
}
}
try {
Visitors.visitAll(workflow,
new Executor(dirs, packageVersions, planPolicy, locker, workflow, cc.completed, cc.todo, observers=Seq(failObserver)),
planPolicy, committedVersion, opts.jobs(), traversal)
Visitors.visitAllThreaded(workflow,
new Executor(dirs, packageVersions, planPolicy, locker, workflow, cc.completed, cc.todo, observers=Seq(failObserver)),
planPolicy, committedVersion, opts.jobs(), traversal)
} catch {
case t: Throwable => {
System.err.println(s"${Config.errorColor}The following tasks failed:${Config.resetColor}")
Expand Down
8 changes: 4 additions & 4 deletions src/main/scala/ducttape/exec/Executor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,13 @@ class Executor(val dirs: DirectoryArchitect,
val workflow: HyperWorkflow,
val alreadyDone: Set[(String,Realization)],
val todo: Set[(String,Realization)],
observers: Seq[ExecutionObserver] = Nil) extends UnpackedDagVisitor with Logging {
observers: Seq[ExecutionObserver] = Nil) extends ThreadedDagVisitor with Logging {

val submitter = new Submitter(workflow.submitters)

observers.foreach(_.init(this))

override def visit(task: VersionedTask) {
override def visit(task: VersionedTask, threadId: Int) {
if (todo( (task.name, task.realization) )) {

val taskEnv = new FullTaskEnvironment(dirs, packageVersioner, task)
Expand All @@ -50,14 +50,14 @@ class Executor(val dirs: DirectoryArchitect,
// while we were waiting on the lock
if (!CompletionChecker.isComplete(taskEnv)) {

System.err.println(s"Running ${task} in ${taskEnv.where.getAbsolutePath}")
System.err.println(s"Running ${task} in ${taskEnv.where.getAbsolutePath} on thread ${threadId}")
observers.foreach(_.begin(this, taskEnv))

Files.mkdirs(taskEnv.where)
debug(s"Environment for ${task} is ${taskEnv.env}")

// the "run" action of the submitter will throw if the exit code is non-zero
submitter.run(taskEnv)
submitter.run(taskEnv, threadId)

def incompleteCallback(task: VersionedTask, msg: String) {
System.err.println(s"${task}: ${msg}")
Expand Down
5 changes: 3 additions & 2 deletions src/main/scala/ducttape/exec/Submitter.scala
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import ducttape.workflow.RealTask

object Submitter {
// some special variables are passed without user intervention
val SPECIAL_VARIABLES = Set("COMMANDS", "TASK_VARIABLES", "TASK", "REALIZATION", "CONFIGURATION")
val SPECIAL_VARIABLES = Set("COMMANDS", "TASK_VARIABLES", "TASK", "REALIZATION", "CONFIGURATION", "THREAD_ID")
}

class Submitter(submitters: Seq[SubmitterDef]) extends Logging {
Expand Down Expand Up @@ -68,7 +68,7 @@ class Submitter(submitters: Seq[SubmitterDef]) extends Logging {
}
}

def run(taskEnv: FullTaskEnvironment) {
def run(taskEnv: FullTaskEnvironment, threadId: Int) {
val submitterDef: SubmitterDef = getSubmitter(taskEnv.task)
val requiredParams: Set[String] = submitterDef.params.map(_.name).toSet
// only include the dot params from the task that are explicitly requested by the submitter
Expand All @@ -87,6 +87,7 @@ class Submitter(submitters: Seq[SubmitterDef]) extends Logging {
("TASK", taskEnv.task.name),
("REALIZATION", taskEnv.task.realization.toString),
("TASK_VARIABLES", taskEnv.taskVariables),
("THREAD_ID", threadId.toString),
("COMMANDS", taskEnv.task.commands.toString)) ++
dotParamsEnv ++ taskEnv.env

Expand Down
4 changes: 4 additions & 0 deletions src/main/scala/ducttape/exec/UnpackedDagVisitor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,7 @@ trait UnpackedRealDagVisitor {
trait UnpackedDagVisitor {
def visit(task: VersionedTask)
}

trait ThreadedDagVisitor {
def visit(task: VersionedTask, threadId: Int)
}
6 changes: 4 additions & 2 deletions src/main/scala/ducttape/hyperdag/walker/Walker.scala
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ trait Walker[A] extends Iterable[A] with Logging { // TODO: Should this be a Tra

// TODO: Add a .par(j) method that returns a parallel walker
// j = numCores (as in make -j)
def foreach[U](j: Int, f: A => U) {
def foreach[U](j: Int, f: (A, Int) => U) {
import java.util.concurrent._
import collection.JavaConversions._

Expand Down Expand Up @@ -70,7 +70,7 @@ trait Walker[A] extends Iterable[A] with Logging { // TODO: Should this be a Tra
var success = true
try {
debug("Executing callback for %s".format(a))
f(a)
f(a, i)
} catch {
// catch exceptions happening within the callback
case t: Throwable => {
Expand Down Expand Up @@ -104,4 +104,6 @@ trait Walker[A] extends Iterable[A] with Logging { // TODO: Should this be a Tra
// call get on each future so that we propagate any exceptions
futures.foreach(_.get)
}

def foreach[U](j: Int, f: A => U): Unit = foreach(j, (a: A, _: Int) => f(a))
}
19 changes: 19 additions & 0 deletions src/main/scala/ducttape/workflow/Visitors.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package ducttape.workflow
import collection._
import ducttape.exec.UnpackedRealDagVisitor
import ducttape.exec.UnpackedDagVisitor
import ducttape.exec.ThreadedDagVisitor
import ducttape.versioner.WorkflowVersionInfo
import ducttape.workflow.Types.UnpackedWorkVert
import ducttape.hyperdag.walker.Traversal
Expand Down Expand Up @@ -47,4 +48,22 @@ object Visitors extends Logging {
})
visitor
}

def visitAllThreaded[A <: ThreadedDagVisitor](
workflow: HyperWorkflow,
visitor: A,
planPolicy: PlanPolicy,
workflowVersion: WorkflowVersionInfo,
numCores: Int = 1,
traversal: Traversal = Arbitrary): A = {

debug(s"Visiting workflow using traversal: ${traversal}")
workflow.unpackedWalker(planPolicy, traversal=traversal).foreach(numCores, { (v: UnpackedWorkVert, threadId: Int) =>
val taskT: TaskTemplate = v.packed.value.get
val task: VersionedTask = taskT.toRealTask(v).toVersionedTask(workflowVersion)
debug(s"Visiting ${task} on thread ${threadId}")
visitor.visit(task, threadId)
})
visitor
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -154,4 +154,31 @@ class UnpackedDagWalkerTest extends FlatSpec with Logging{
}
timer.interrupt()
}

it should "traverse a diamond with the threaded walker and pass threadId" in {
val numThreads = 2
val vertsToThreads = new collection.mutable.HashMap[String, Int]()
val seenThreads = new collection.mutable.HashSet[Int]()

diamond.unpackedWalker(traversal=Arbitrary).foreach(numThreads, { (v, threadId) =>
Thread.sleep(10) // Add a small delay to ensure multiple threads are used
vertsToThreads.synchronized {
vertsToThreads(v.packed.value) = threadId
seenThreads += threadId
}
})

// Check that all vertices were visited
val expectedVerts = Set("Vertex A", "Vertex B", "Vertex C", "Vertex D")
assert(vertsToThreads.keySet == expectedVerts, s"Vertices visited: ${vertsToThreads.keySet}")

// Check that threadIds are in the expected range
assert(seenThreads.subsetOf((0 until numThreads).toSet), s"Thread IDs used: $seenThreads")

// Check that more than one thread was used
assert(seenThreads.size > 1, s"Expected more than one thread, got: $seenThreads")

// TODO: Check thread mapping is expected
// Can I make any guarantees about the thread mapping?
}
}
2 changes: 1 addition & 1 deletion tutorial/02-03-git.tape
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ task lunchtime : lunchpy {
}

# * Build commands are only called when versioner indicates a new version
package lunchpy :: .versioner=git .repo="git://github.com/mjdenkowski/lunchpy.git" .ref=HEAD {
package lunchpy :: .versioner=git .repo="https://github.com/mjdenkowski/lunchpy.git" .ref=HEAD {
# We don't actually need to compile anything for python code,
# but for the sake of example we'll make this program run a bit faster
python -m compileall .
Expand Down
2 changes: 1 addition & 1 deletion version.info
Original file line number Diff line number Diff line change
@@ -1 +1 @@
0.5.0
0.5.1