Skip to content

Commit

Permalink
Add selected Go files from legacy SciPipe code base
Browse files Browse the repository at this point in the history
  • Loading branch information
samuell committed Jul 5, 2016
1 parent 0483ba9 commit 584d620
Show file tree
Hide file tree
Showing 8 changed files with 360 additions and 0 deletions.
6 changes: 6 additions & 0 deletions const.go
@@ -0,0 +1,6 @@
package flowbase

const (
// Standard buffer size used for channels connecting processes
BUFSIZE = 16
)
116 changes: 116 additions & 0 deletions log.go
@@ -0,0 +1,116 @@
package flowbase

import (
"io"
"io/ioutil"
"log"
"os"
)

var (
Trace *log.Logger
Debug *log.Logger
Info *log.Logger
Audit *log.Logger
Warning *log.Logger
Error *log.Logger
LogExists bool
)

// Initiate logging
func InitLog(
traceHandle io.Writer,
debugHandle io.Writer,
infoHandle io.Writer,
auditHandle io.Writer,
warningHandle io.Writer,
errorHandle io.Writer) {

Trace = log.New(traceHandle,
"TRACE ",
log.Ldate|log.Ltime|log.Lshortfile)

Debug = log.New(debugHandle,
"DEBUG ",
log.Ldate|log.Ltime|log.Lshortfile)

Info = log.New(infoHandle,
"INFO ",
log.Ldate|log.Ltime)

// This level is the one suggested to use when running scientific workflows, to retain audit
// information
Audit = log.New(auditHandle,
"AUDIT ",
log.Ldate|log.Ltime)

Warning = log.New(warningHandle,
"WARNING ",
log.Ldate|log.Ltime)

Error = log.New(errorHandle,
"ERROR ",
log.Ldate|log.Ltime)

LogExists = true
}

// Initiate logging with level=DEBUG
func InitLogDebug() {
InitLog(
ioutil.Discard,
os.Stdout,
os.Stdout,
os.Stdout,
os.Stdout,
os.Stderr,
)
}

// Initiate logging with level=INFO
func InitLogInfo() {
InitLog(
ioutil.Discard,
ioutil.Discard,
os.Stdout,
os.Stdout,
os.Stdout,
os.Stderr,
)
}

// Initiate logging with level=AUDIT
func InitLogAudit() {
InitLog(
ioutil.Discard,
ioutil.Discard,
ioutil.Discard,
os.Stdout,
os.Stdout,
os.Stderr,
)
}

// Initiate logging with level=WARNING
func InitLogWarning() {
InitLog(
ioutil.Discard,
ioutil.Discard,
ioutil.Discard,
ioutil.Discard,
os.Stdout,
os.Stderr,
)
}

// Initiate logging with level=ERROR
func InitLogError() {
InitLog(
ioutil.Discard,
ioutil.Discard,
ioutil.Discard,
ioutil.Discard,
ioutil.Discard,
os.Stderr,
)
}
62 changes: 62 additions & 0 deletions pipeline.go
@@ -0,0 +1,62 @@
package flowbase

import (
"fmt"
"os"
"reflect"
)

type PipelineRunner struct {
processes []Process
}

func NewPipelineRunner() *PipelineRunner {
return &PipelineRunner{}
}

func (pl *PipelineRunner) AddProcess(proc Process) {
pl.processes = append(pl.processes, proc)
}

func (pl *PipelineRunner) AddProcesses(procs ...Process) {
for _, proc := range procs {
pl.AddProcess(proc)
}
}

func (pl *PipelineRunner) PrintProcesses() {
for i, proc := range pl.processes {
fmt.Printf("Process %d: %v\n", i, reflect.TypeOf(proc))
}
}

func (pl *PipelineRunner) Run() {
if !LogExists {
InitLogAudit()
}
if len(pl.processes) == 0 {
Error.Println("PipelineRunner: The PipelineRunner is empty. Did you forget to add the processes to it?")
os.Exit(1)
}
everythingConnected := true
for _, proc := range pl.processes {
if !proc.IsConnected() {
everythingConnected = false
}
}
if !everythingConnected {
Error.Println("PipelineRunner: Pipeline shutting down, since not all ports are connected!")
os.Exit(1)
} else {
for i, proc := range pl.processes {
Debug.Printf("PipelineRunner: Looping over process %d: %v ...\n", i, proc)
if i < len(pl.processes)-1 {
Debug.Printf("PipelineRunner: Starting process %d in new go-routine: %v\n", i, proc)
go proc.Run()
} else {
Debug.Printf("PipelineRunner: Starting process %d: in main go-routine: %v\n", i, proc)
proc.Run()
}
}
}
}
73 changes: 73 additions & 0 deletions pipeline_test.go
@@ -0,0 +1,73 @@
package flowbase

import (
"github.com/stretchr/testify/assert"
t "testing"
)

func TestAddProcesses(t *t.T) {
InitLogError()

proc1 := NewBogusProcess()
proc2 := NewBogusProcess()
pipeline := NewPipelineRunner()
pipeline.AddProcesses(proc1, proc2)

assert.EqualValues(t, len(pipeline.processes), 2)

assert.IsType(t, &BogusProcess{}, pipeline.processes[0], "Process 1 was not of the right type!")
assert.IsType(t, &BogusProcess{}, pipeline.processes[1], "Process 2 was not of the right type!")
}

func TestRunProcessesInPipelineRunner(t *t.T) {
proc1 := NewBogusProcess()
proc2 := NewBogusProcess()

pipeline := NewPipelineRunner()
pipeline.AddProcesses(proc1, proc2)
pipeline.Run()

// Only the last process is supposed to be run by the pipeline directly,
// while the others are only run if an output is pulled on an out-port,
// but since we haven't connected the tasks here, only the last one
// should be ran in this case.
assert.False(t, proc1.WasRun, "Process 1 was run!")
assert.True(t, proc2.WasRun, "Process 2 was not run!")
}

func ExamplePrintProcesses() {
proc1 := NewBogusProcess()
proc2 := NewBogusProcess()

pipeline := NewPipelineRunner()
pipeline.AddProcesses(proc1, proc2)
pipeline.Run()

pipeline.PrintProcesses()
// Output:
// Process 0: *flowbase.BogusProcess
// Process 1: *flowbase.BogusProcess
}

// --------------------------------
// Helper stuff
// --------------------------------

// A process with does just satisfy the Process interface, without doing any
// actual work.
type BogusProcess struct {
Process
WasRun bool
}

func NewBogusProcess() *BogusProcess {
return &BogusProcess{WasRun: false}
}

func (p *BogusProcess) Run() {
p.WasRun = true
}

func (p *BogusProcess) IsConnected() bool {
return true
}
7 changes: 7 additions & 0 deletions process.go
@@ -0,0 +1,7 @@
package flowbase

// Base interface for all processes
type Process interface {
IsConnected() bool
Run()
}
38 changes: 38 additions & 0 deletions sink.go
@@ -0,0 +1,38 @@
package flowbase

type Sink struct {
Process
inPorts []chan interface{}
}

// Instantiate a Sink component
func NewSink() (s *Sink) {
return &Sink{
inPorts: make([]chan interface{}, BUFSIZE),
}
}

func (proc *Sink) Connect(ch chan interface{}) {
proc.inPorts = append(proc.inPorts, ch)
}

// Execute the Sink component
func (proc *Sink) Run() {
ok := true
for len(proc.inPorts) > 0 {
for i, ich := range proc.inPorts {
select {
case _, ok = <-ich:
if !ok {
proc.deleteInPortAtKey(i)
continue
}
default:
}
}
}
}

func (proc *Sink) deleteInPortAtKey(i int) {
proc.inPorts = append(proc.inPorts[:i], proc.inPorts[i+1:]...)
}
33 changes: 33 additions & 0 deletions utils.go
@@ -0,0 +1,33 @@
package flowbase

import (
// "github.com/go-errors/errors"
//"os"
"os"
"os/exec"
re "regexp"
)

func ExecCmd(cmd string) string {
Info.Println("Executing command: ", cmd)
combOutput, err := exec.Command("bash", "-lc", cmd).CombinedOutput()
if err != nil {
Error.Println("Could not execute command `" + cmd + "`: " + string(combOutput))
os.Exit(1)
}
return string(combOutput)
}

func Check(err error) {
if err != nil {
panic(err)
}
}

// Return the regular expression used to parse the place-holder syntax for in-, out- and
// parameter ports, that can be used to instantiate a SciProcess.
func getShellCommandPlaceHolderRegex() *re.Regexp {
r, err := re.Compile("{(o|os|i|is|p):([^{}:]+)}")
Check(err)
return r
}
25 changes: 25 additions & 0 deletions utils_test.go
@@ -0,0 +1,25 @@
package flowbase

import (
"errors"
"testing"
)

func TestExecCmd_EchoFooBar(t *testing.T) {
output := ExecCmd("echo foo bar")
if output != "foo bar\n" {
t.Errorf("output = %swant: foo bar\n", output)
}
}

func TestCheck_Panics(t *testing.T) {
// Recover the panic, and check that the recover "was needed" (r was not
// nil)
defer func() {
if r := recover(); r == nil {
t.Error("The code did not panic as it should!")
}
}()
err := errors.New("A test-error")
Check(err)
}

0 comments on commit 584d620

Please sign in to comment.