diff --git a/const.go b/const.go new file mode 100644 index 0000000..94f829c --- /dev/null +++ b/const.go @@ -0,0 +1,6 @@ +package flowbase + +const ( + // Standard buffer size used for channels connecting processes + BUFSIZE = 16 +) diff --git a/log.go b/log.go new file mode 100644 index 0000000..b08662d --- /dev/null +++ b/log.go @@ -0,0 +1,116 @@ +package flowbase + +import ( + "io" + "io/ioutil" + "log" + "os" +) + +var ( + Trace *log.Logger + Debug *log.Logger + Info *log.Logger + Audit *log.Logger + Warning *log.Logger + Error *log.Logger + LogExists bool +) + +// Initiate logging +func InitLog( + traceHandle io.Writer, + debugHandle io.Writer, + infoHandle io.Writer, + auditHandle io.Writer, + warningHandle io.Writer, + errorHandle io.Writer) { + + Trace = log.New(traceHandle, + "TRACE ", + log.Ldate|log.Ltime|log.Lshortfile) + + Debug = log.New(debugHandle, + "DEBUG ", + log.Ldate|log.Ltime|log.Lshortfile) + + Info = log.New(infoHandle, + "INFO ", + log.Ldate|log.Ltime) + + // This level is the one suggested to use when running scientific workflows, to retain audit + // information + Audit = log.New(auditHandle, + "AUDIT ", + log.Ldate|log.Ltime) + + Warning = log.New(warningHandle, + "WARNING ", + log.Ldate|log.Ltime) + + Error = log.New(errorHandle, + "ERROR ", + log.Ldate|log.Ltime) + + LogExists = true +} + +// Initiate logging with level=DEBUG +func InitLogDebug() { + InitLog( + ioutil.Discard, + os.Stdout, + os.Stdout, + os.Stdout, + os.Stdout, + os.Stderr, + ) +} + +// Initiate logging with level=INFO +func InitLogInfo() { + InitLog( + ioutil.Discard, + ioutil.Discard, + os.Stdout, + os.Stdout, + os.Stdout, + os.Stderr, + ) +} + +// Initiate logging with level=AUDIT +func InitLogAudit() { + InitLog( + ioutil.Discard, + ioutil.Discard, + ioutil.Discard, + os.Stdout, + os.Stdout, + os.Stderr, + ) +} + +// Initiate logging with level=WARNING +func InitLogWarning() { + InitLog( + ioutil.Discard, + ioutil.Discard, + ioutil.Discard, + ioutil.Discard, + os.Stdout, + os.Stderr, + ) +} + +// Initiate logging with level=ERROR +func InitLogError() { + InitLog( + ioutil.Discard, + ioutil.Discard, + ioutil.Discard, + ioutil.Discard, + ioutil.Discard, + os.Stderr, + ) +} diff --git a/pipeline.go b/pipeline.go new file mode 100644 index 0000000..5a1f6fa --- /dev/null +++ b/pipeline.go @@ -0,0 +1,62 @@ +package flowbase + +import ( + "fmt" + "os" + "reflect" +) + +type PipelineRunner struct { + processes []Process +} + +func NewPipelineRunner() *PipelineRunner { + return &PipelineRunner{} +} + +func (pl *PipelineRunner) AddProcess(proc Process) { + pl.processes = append(pl.processes, proc) +} + +func (pl *PipelineRunner) AddProcesses(procs ...Process) { + for _, proc := range procs { + pl.AddProcess(proc) + } +} + +func (pl *PipelineRunner) PrintProcesses() { + for i, proc := range pl.processes { + fmt.Printf("Process %d: %v\n", i, reflect.TypeOf(proc)) + } +} + +func (pl *PipelineRunner) Run() { + if !LogExists { + InitLogAudit() + } + if len(pl.processes) == 0 { + Error.Println("PipelineRunner: The PipelineRunner is empty. Did you forget to add the processes to it?") + os.Exit(1) + } + everythingConnected := true + for _, proc := range pl.processes { + if !proc.IsConnected() { + everythingConnected = false + } + } + if !everythingConnected { + Error.Println("PipelineRunner: Pipeline shutting down, since not all ports are connected!") + os.Exit(1) + } else { + for i, proc := range pl.processes { + Debug.Printf("PipelineRunner: Looping over process %d: %v ...\n", i, proc) + if i < len(pl.processes)-1 { + Debug.Printf("PipelineRunner: Starting process %d in new go-routine: %v\n", i, proc) + go proc.Run() + } else { + Debug.Printf("PipelineRunner: Starting process %d: in main go-routine: %v\n", i, proc) + proc.Run() + } + } + } +} diff --git a/pipeline_test.go b/pipeline_test.go new file mode 100644 index 0000000..1efba94 --- /dev/null +++ b/pipeline_test.go @@ -0,0 +1,73 @@ +package flowbase + +import ( + "github.com/stretchr/testify/assert" + t "testing" +) + +func TestAddProcesses(t *t.T) { + InitLogError() + + proc1 := NewBogusProcess() + proc2 := NewBogusProcess() + pipeline := NewPipelineRunner() + pipeline.AddProcesses(proc1, proc2) + + assert.EqualValues(t, len(pipeline.processes), 2) + + assert.IsType(t, &BogusProcess{}, pipeline.processes[0], "Process 1 was not of the right type!") + assert.IsType(t, &BogusProcess{}, pipeline.processes[1], "Process 2 was not of the right type!") +} + +func TestRunProcessesInPipelineRunner(t *t.T) { + proc1 := NewBogusProcess() + proc2 := NewBogusProcess() + + pipeline := NewPipelineRunner() + pipeline.AddProcesses(proc1, proc2) + pipeline.Run() + + // Only the last process is supposed to be run by the pipeline directly, + // while the others are only run if an output is pulled on an out-port, + // but since we haven't connected the tasks here, only the last one + // should be ran in this case. + assert.False(t, proc1.WasRun, "Process 1 was run!") + assert.True(t, proc2.WasRun, "Process 2 was not run!") +} + +func ExamplePrintProcesses() { + proc1 := NewBogusProcess() + proc2 := NewBogusProcess() + + pipeline := NewPipelineRunner() + pipeline.AddProcesses(proc1, proc2) + pipeline.Run() + + pipeline.PrintProcesses() + // Output: + // Process 0: *flowbase.BogusProcess + // Process 1: *flowbase.BogusProcess +} + +// -------------------------------- +// Helper stuff +// -------------------------------- + +// A process with does just satisfy the Process interface, without doing any +// actual work. +type BogusProcess struct { + Process + WasRun bool +} + +func NewBogusProcess() *BogusProcess { + return &BogusProcess{WasRun: false} +} + +func (p *BogusProcess) Run() { + p.WasRun = true +} + +func (p *BogusProcess) IsConnected() bool { + return true +} diff --git a/process.go b/process.go new file mode 100644 index 0000000..0d98413 --- /dev/null +++ b/process.go @@ -0,0 +1,7 @@ +package flowbase + +// Base interface for all processes +type Process interface { + IsConnected() bool + Run() +} diff --git a/sink.go b/sink.go new file mode 100644 index 0000000..537f257 --- /dev/null +++ b/sink.go @@ -0,0 +1,38 @@ +package flowbase + +type Sink struct { + Process + inPorts []chan interface{} +} + +// Instantiate a Sink component +func NewSink() (s *Sink) { + return &Sink{ + inPorts: make([]chan interface{}, BUFSIZE), + } +} + +func (proc *Sink) Connect(ch chan interface{}) { + proc.inPorts = append(proc.inPorts, ch) +} + +// Execute the Sink component +func (proc *Sink) Run() { + ok := true + for len(proc.inPorts) > 0 { + for i, ich := range proc.inPorts { + select { + case _, ok = <-ich: + if !ok { + proc.deleteInPortAtKey(i) + continue + } + default: + } + } + } +} + +func (proc *Sink) deleteInPortAtKey(i int) { + proc.inPorts = append(proc.inPorts[:i], proc.inPorts[i+1:]...) +} diff --git a/utils.go b/utils.go new file mode 100644 index 0000000..52a95d5 --- /dev/null +++ b/utils.go @@ -0,0 +1,33 @@ +package flowbase + +import ( + // "github.com/go-errors/errors" + //"os" + "os" + "os/exec" + re "regexp" +) + +func ExecCmd(cmd string) string { + Info.Println("Executing command: ", cmd) + combOutput, err := exec.Command("bash", "-lc", cmd).CombinedOutput() + if err != nil { + Error.Println("Could not execute command `" + cmd + "`: " + string(combOutput)) + os.Exit(1) + } + return string(combOutput) +} + +func Check(err error) { + if err != nil { + panic(err) + } +} + +// Return the regular expression used to parse the place-holder syntax for in-, out- and +// parameter ports, that can be used to instantiate a SciProcess. +func getShellCommandPlaceHolderRegex() *re.Regexp { + r, err := re.Compile("{(o|os|i|is|p):([^{}:]+)}") + Check(err) + return r +} diff --git a/utils_test.go b/utils_test.go new file mode 100644 index 0000000..3473262 --- /dev/null +++ b/utils_test.go @@ -0,0 +1,25 @@ +package flowbase + +import ( + "errors" + "testing" +) + +func TestExecCmd_EchoFooBar(t *testing.T) { + output := ExecCmd("echo foo bar") + if output != "foo bar\n" { + t.Errorf("output = %swant: foo bar\n", output) + } +} + +func TestCheck_Panics(t *testing.T) { + // Recover the panic, and check that the recover "was needed" (r was not + // nil) + defer func() { + if r := recover(); r == nil { + t.Error("The code did not panic as it should!") + } + }() + err := errors.New("A test-error") + Check(err) +}