Skip to content

Commit

Permalink
Merge f7774c1 into b49aa45
Browse files Browse the repository at this point in the history
  • Loading branch information
owulveryck committed Aug 14, 2020
2 parents b49aa45 + f7774c1 commit 471aa57
Show file tree
Hide file tree
Showing 8 changed files with 433 additions and 14 deletions.
65 changes: 65 additions & 0 deletions x/dataviz/traces.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
package dataviz

import (
"encoding/json"
"io"
"strconv"
"time"

"gorgonia.org/gorgonia"
xvm "gorgonia.org/gorgonia/x/vm"
)

// DumpTrace suitable for https://github.com/vasturiano/timelines-chart
func DumpTrace(traces []xvm.Trace, g *gorgonia.ExprGraph, w io.Writer) error {
var zerotime time.Time
groups := make(map[string]group)
// generate all labels
for _, trace := range traces {
if trace.End == zerotime {
continue
}
if _, ok := groups[trace.StateFunction]; !ok {
groups[trace.StateFunction] = group{
Group: trace.StateFunction,
}
}
label := dataLabel{
TimeRange: []time.Time{
trace.Start,
trace.End,
},
Val: strconv.Itoa(int(trace.ID)),
}
dGroup := dataGroup{
Label: g.Node(trace.ID).(*gorgonia.Node).Name(),
Data: []dataLabel{
label,
},
}
g := groups[trace.StateFunction]
g.Data = append(g.Data, dGroup)
groups[trace.StateFunction] = g
}
grps := make([]group, 0, len(groups))
for _, grp := range groups {
grps = append(grps, grp)
}
enc := json.NewEncoder(w)
return enc.Encode(grps)
}

type group struct {
Group string `json:"group"`
Data []dataGroup `json:"data"`
}

type dataGroup struct {
Label string `json:"label"`
Data []dataLabel `json:"data"`
}

type dataLabel struct {
TimeRange []time.Time `json:"timeRange"`
Val interface{} `json:"val"`
}
30 changes: 30 additions & 0 deletions x/dataviz/traces_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package dataviz

import (
"context"
"log"
"os"

"gorgonia.org/gorgonia"
xvm "gorgonia.org/gorgonia/x/vm"
)

func ExampleDumpTrace() {
g := gorgonia.NewGraph()
// Add elements
ctx, traceC := xvm.WithTracing(context.Background())
defer xvm.CloseTracing(ctx)
traces := make([]xvm.Trace, 0)
go func() {
for v := range traceC {
traces = append(traces, v)
}
}()
machine := xvm.NewMachine(g)
err := machine.Run(ctx)
if err != nil {
log.Fatal(err)
}
machine.Close()
DumpTrace(traces, g, os.Stdout)
}
54 changes: 41 additions & 13 deletions x/vm/machine.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@ package xvm

import (
"context"
"strconv"
"strings"
"time"

"gorgonia.org/gorgonia"
)
Expand Down Expand Up @@ -121,33 +124,58 @@ func (m *Machine) Close() {
}
}

type nodeError struct {
id int64
t time.Time
err error
}

type nodeErrors []nodeError

func (e nodeErrors) Error() string {
var sb strings.Builder
for _, e := range e {
sb.WriteString(strconv.Itoa(int(e.id)))
sb.WriteString(":")
sb.WriteString(e.err.Error())
sb.WriteString("\n")
}
return sb.String()
}

// Run performs the computation
func (m *Machine) runAllNodes(ctx context.Context) error {
ctx, cancel := context.WithCancel(ctx)
errC := make(chan error, 0)
errC := make(chan nodeError, 0)
total := len(m.nodes)
for i := range m.nodes {
go func(n *node) {
errC <- n.Compute(ctx)
err := n.Compute(ctx)
errC <- nodeError{
id: n.id,
t: time.Now(),
err: err,
}
}(m.nodes[i])
}
var err error
for err = range errC {
errs := make([]nodeError, 0)
for e := range errC {
total--
if err != nil || total == 0 {
break
if e.err != nil {
errs = append(errs, e)
// failfast, on error, cancel
cancel()
}
}
for moreChannel := true; moreChannel; {
select {
case <-errC:
default:
moreChannel = false
if total == 0 {
break
}
}
cancel()
close(errC)
return err
if len(errs) != 0 {
return nodeErrors(errs)
}
return nil
}

// GetResult stored in a node
Expand Down
47 changes: 46 additions & 1 deletion x/vm/machine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package xvm

import (
"context"
"errors"
"fmt"
"log"
"reflect"
Expand Down Expand Up @@ -29,6 +30,12 @@ func TestMachine_runAllNodes(t *testing.T) {
outputC: outputC2,
inputC: inputC2,
}
errNode1 := &node{
op: &errorOP{},
inputValues: make([]gorgonia.Value, 2),
outputC: outputC2,
inputC: inputC2,
}
type fields struct {
nodes []*node
pubsubs *pubsub
Expand All @@ -52,7 +59,16 @@ func TestMachine_runAllNodes(t *testing.T) {
},
false,
},
// TODO: Add test cases.
{
"error",
fields{
nodes: []*node{n1, errNode1},
},
args{
context.Background(),
},
true,
},
}
for _, tt := range tests {
forty := gorgonia.F32(40.0)
Expand Down Expand Up @@ -96,6 +112,9 @@ func TestMachine_runAllNodes(t *testing.T) {
if err := m.runAllNodes(tt.args.ctx); (err != nil) != tt.wantErr {
t.Errorf("Machine.runAllNodes() error = %v, wantErr %v", err, tt.wantErr)
}
if tt.wantErr {
return
}
out1 := <-outputC1
out2 := <-outputC2
if !reflect.DeepEqual(out1.Data(), fortyTwo.Data()) {
Expand Down Expand Up @@ -533,3 +552,29 @@ func TestMachine_GetResult(t *testing.T) {
})
}
}

func Test_nodeErrors_Error(t *testing.T) {
tests := []struct {
name string
e nodeErrors
want string
}{
{
"simple",
[]nodeError{
{
id: 0,
err: errors.New("error"),
},
},
"0:error\n",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if got := tt.e.Error(); got != tt.want {
t.Errorf("nodeErrors.Error() = %v, want %v", got, tt.want)
}
})
}
}
2 changes: 2 additions & 0 deletions x/vm/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,9 @@ func computeBackward(_ context.Context, _ *node) stateFn {

func (n *node) Compute(ctx context.Context) error {
for state := defaultState; state != nil; {
t := trace(ctx, nil, n, state)
state = state(ctx, n)
trace(ctx, t, nil, nil)
}
return n.err
}
Expand Down
6 changes: 6 additions & 0 deletions x/vm/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,12 @@ func Test_node_ComputeForward(t *testing.T) {
}
}

type errorOP struct{}

func (*errorOP) Do(v ...gorgonia.Value) (gorgonia.Value, error) {
return nil, errors.New("error")
}

type sumF32 struct{}

func (*sumF32) Do(v ...gorgonia.Value) (gorgonia.Value, error) {
Expand Down
71 changes: 71 additions & 0 deletions x/vm/tracer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
package xvm

import (
"context"
"reflect"
"runtime"
"time"
)

// Trace the nodes states
type Trace struct {
//fmt.Println(runtime.FuncForPC(reflect.ValueOf(state).Pointer()).Name())
StateFunction string
ID int64
Start time.Time
End time.Time `json:",omitempty"`
}

type chanTracerContextKey int

const (
globalTracerContextKey chanTracerContextKey = 0
)

// WithTracing initializes a tracing channel and adds it to the context
func WithTracing(parent context.Context) (context.Context, <-chan Trace) {
c := make(chan Trace, 0)
return context.WithValue(parent, globalTracerContextKey, c), c
}

// CloseTracing the tracing channel to avoid context leak.
// it is a nil op if context does not carry tracing information
func CloseTracing(ctx context.Context) {
c := extractTracingChannel(ctx)
if c != nil {
close(c)
}
}

func extractTracingChannel(ctx context.Context) chan<- Trace {
if ctx == nil {
return nil
}
if c := ctx.Value(globalTracerContextKey); c != nil {
return c.(chan Trace)
}
return nil
}

var now = time.Now

func trace(ctx context.Context, t *Trace, n *node, state stateFn) *Trace {
traceC := extractTracingChannel(ctx)
if traceC == nil {
return t
}
if t == nil {
t = &Trace{
ID: n.id,
StateFunction: runtime.FuncForPC(reflect.ValueOf(state).Pointer()).Name(),
Start: now(),
}
} else {
t.End = now()
}
select {
case traceC <- *t:
case <-ctx.Done():
}
return t
}

0 comments on commit 471aa57

Please sign in to comment.