From a2de99ce0e9a6cc9926fa7c48fc62173476c5efc Mon Sep 17 00:00:00 2001 From: Jonathan Amsterdam Date: Wed, 22 May 2024 07:54:34 -0400 Subject: [PATCH 1/4] [Go] more API and doc changes - Export Logger, remove DebugLog. - Simplify the doc for Flow. - Unexport more symbols that users shouldn't need. --- go/genkit/action.go | 4 ++-- go/genkit/dev_server.go | 4 ++-- go/genkit/flow.go | 40 ++++++++++++++++++--------------- go/genkit/flow_test.go | 40 +++++++++++++++++++++++++++++++++ go/genkit/gen.go | 10 +++------ go/genkit/logging.go | 9 ++------ go/genkit/metrics.go | 2 +- go/genkit/registry.go | 2 +- go/genkit/schemas.config | 7 +++--- go/genkit/tracing.go | 2 +- go/plugins/localvec/localvec.go | 2 +- 11 files changed, 79 insertions(+), 43 deletions(-) diff --git a/go/genkit/action.go b/go/genkit/action.go index 13661f02c6..0711dee39b 100644 --- a/go/genkit/action.go +++ b/go/genkit/action.go @@ -96,11 +96,11 @@ func (a *Action[I, O, S]) setTracingState(tstate *tracingState) { a.tstate = tst func (a *Action[I, O, S]) Run(ctx context.Context, input I, cb StreamingCallback[S]) (output O, err error) { // TODO: validate input against JSONSchema for I. // TODO: validate output against JSONSchema for O. - logger(ctx).Debug("Action.Run", + Logger(ctx).Debug("Action.Run", "name", a.name, "input", fmt.Sprintf("%#v", input)) defer func() { - logger(ctx).Debug("Action.Run", + Logger(ctx).Debug("Action.Run", "name", a.name, "output", fmt.Sprintf("%#v", output), "err", err) diff --git a/go/genkit/dev_server.go b/go/genkit/dev_server.go index 0015c4531b..0e13a6fb56 100644 --- a/go/genkit/dev_server.go +++ b/go/genkit/dev_server.go @@ -160,7 +160,7 @@ func (s *devServer) handleRunAction(w http.ResponseWriter, r *http.Request) erro return err } } - logger(ctx).Debug("running action", + Logger(ctx).Debug("running action", "key", body.Key, "stream", stream) var callback StreamingCallback[json.RawMessage] @@ -286,7 +286,7 @@ func writeJSON(ctx context.Context, w http.ResponseWriter, value any) error { } _, err = w.Write(data) if err != nil { - logger(ctx).Error("writing output", "err", err) + Logger(ctx).Error("writing output", "err", err) } return nil } diff --git a/go/genkit/flow.go b/go/genkit/flow.go index 5f43db1b65..339cc194b9 100644 --- a/go/genkit/flow.go +++ b/go/genkit/flow.go @@ -33,14 +33,15 @@ import ( // TODO(jba): provide a way to start a Flow from user code. // A Flow is a kind of Action that can be interrupted and resumed. +// (Resumption is an experimental feature in the Javascript implementation, +// and not yet supported in Go.) // -// A Flow[I, O, S] represents a function from I to O (the S parameter is described -// under "Streaming" below). But the function may run in pieces, with interruptions -// and resumptions. (The interruptions discussed here are a part of the flow mechanism, -// not hardware interrupts.) The actual Go function for the flow may be executed -// multiple times, each time making more progress, until finally it completes with -// a value of type O or an error. The mechanism used to achieve this is explained -// below. +// A Flow[I, O, S] represents a function from I to O (the S parameter is for streaming, +// described below). But the function may run in pieces, with interruptions and resumptions. +// (The interruptions discussed here are a part of the flow mechanism, not hardware +// interrupts.) The actual Go function for the flow may be executed multiple times, +// each time making more progress, until finally it completes with a value of type +// O or an error. The mechanism used to achieve this is explained below. // // To treat a flow as an action, which is an uninterrupted function execution, we // use different input and output types to capture the additional behavior. The input @@ -60,8 +61,8 @@ import ( // are cached in the flowState. The programmer marks these steps manually, by calling // genkit.Run. // -// A flow computation consists of one or more flow executions. (The FlowExecution -// type records information about these; a flowState holds a slice of FlowExecutions.) +// A flow computation consists of one or more flow executions. (The flowExecution +// type records information about these; a flowState holds a slice of flowExecutions.) // The computation begins with a "start" instruction. If the function is not interrupted, // it will run to completion and the final state will contain its result. If it is // interrupted, state will contain information about how and when it can be resumed. @@ -72,8 +73,6 @@ import ( // "schedule" instruction accomplishes this; the flow is finally started at a later // time by the "runScheduled" instruction. // -// # Streaming -// // Some flows can "stream" their results, providing them incrementally. To do so, // the flow invokes a callback repeatedly. When streaming is complete, the flow // returns a final result in the usual way. @@ -82,6 +81,10 @@ import ( // // Streaming is only supported for the "start" flow instruction. Currently there is // no way to schedule or resume a flow with streaming. + +// A Flow is an Action with additional support for observability and introspection. +// A Flow[I, O, S] represents a function from I to O. The S parameter is for +// flows that support streaming: providing their results incrementally. type Flow[I, O, S any] struct { name string // The last component of the flow's key in the registry. fn Func[I, O, S] // The function to run. @@ -171,7 +174,7 @@ type flowState[I, O any] struct { mu sync.Mutex Cache map[string]json.RawMessage `json:"cache,omitempty"` EventsTriggered map[string]any `json:"eventsTriggered,omitempty"` - Executions []*FlowExecution `json:"executions,omitempty"` + Executions []*flowExecution `json:"executions,omitempty"` // The operation is the user-visible part of the state. Operation *Operation[O] `json:"operation,omitempty"` TraceContext string `json:"traceContext,omitempty"` @@ -208,7 +211,7 @@ func (fs *flowState[I, O]) cache() map[string]json.RawMessage { return fs.Cache // An Operation describes the state of a Flow that may still be in progress. type Operation[O any] struct { FlowID string `json:"name,omitempty"` - BlockedOnStep *BlockedOnStep `json:"blockedOnStep,omitempty"` + BlockedOnStep *blockedOnStep `json:"blockedOnStep,omitempty"` // Whether the operation is completed. // If true Result will be non-nil. Done bool `json:"done,omitempty"` @@ -220,7 +223,6 @@ type Operation[O any] struct { // A FlowResult is the result of a flow: either success, in which case Response is // the return value of the flow's function; or failure, in which case Error is the // non-empty error string. -// (Called FlowResponse in the javascript.) type FlowResult[O any] struct { Response O `json:"response,omitempty"` Error string `json:"error,omitempty"` @@ -228,6 +230,8 @@ type FlowResult[O any] struct { StackTrace string `json:"stacktrace,omitempty"` } +// FlowResult is called FlowResponse in the javascript. + // action creates an action for the flow. See the comment at the top of this file for more information. func (f *Flow[I, O, S]) action() *Action[*flowInstruction[I], *flowState[I, O], S] { return NewStreamingAction(f.name, nil, func(ctx context.Context, inst *flowInstruction[I], cb StreamingCallback[S]) (*flowState[I, O], error) { @@ -281,11 +285,11 @@ func (f *Flow[I, O, S]) execute(ctx context.Context, state *flowState[I, O], dis defer func() { if err := fctx.finish(ctx); err != nil { // TODO(jba): do something more with this error? - logger(ctx).Error("flowContext.finish", "err", err.Error()) + Logger(ctx).Error("flowContext.finish", "err", err.Error()) } }() ctx = flowContextKey.newContext(ctx, fctx) - exec := &FlowExecution{ + exec := &flowExecution{ StartTime: gtime.ToMilliseconds(time.Now()), } state.mu.Lock() @@ -310,14 +314,14 @@ func (f *Flow[I, O, S]) execute(ctx context.Context, state *flowState[I, O], dis latency := time.Since(start) if err != nil { // TODO(jba): handle InterruptError - logger(ctx).Error("flow failed", + Logger(ctx).Error("flow failed", "path", spanMeta.Path, "err", err.Error(), ) writeFlowFailure(ctx, f.name, latency, err) spanMeta.SetAttr("flow:state", "error") } else { - logger(ctx).Info("flow succeeded", "path", spanMeta.Path) + Logger(ctx).Info("flow succeeded", "path", spanMeta.Path) writeFlowSuccess(ctx, f.name, latency) spanMeta.SetAttr("flow:state", "done") diff --git a/go/genkit/flow_test.go b/go/genkit/flow_test.go index acf4a851e3..cfffbbbc69 100644 --- a/go/genkit/flow_test.go +++ b/go/genkit/flow_test.go @@ -16,6 +16,7 @@ package genkit import ( "context" + "encoding/json" "slices" "testing" @@ -123,3 +124,42 @@ func TestStreamFlow(t *testing.T) { return true }) } + +func TestFlowState(t *testing.T) { + // A flowState is an action output, so it must support JSON marshaling. + // Verify that a fully populated flowState can round-trip via JSON. + + fs := &flowState[int, int]{ + FlowID: "id", + FlowName: "name", + StartTime: 1, + Input: 2, + Cache: map[string]json.RawMessage{"x": json.RawMessage([]byte("3"))}, + EventsTriggered: map[string]any{"a": "b"}, + Executions: []*flowExecution{{StartTime: 4, EndTime: 5, TraceIDs: []string{"c"}}}, + Operation: &Operation[int]{ + FlowID: "id", + BlockedOnStep: &blockedOnStep{Name: "bos", Schema: "s"}, + Done: true, + Metadata: "meta", + Result: &FlowResult[int]{ + Response: 6, + Error: "err", + StackTrace: "st", + }, + }, + TraceContext: "tc", + } + data, err := json.Marshal(fs) + if err != nil { + t.Fatal(err) + } + var got *flowState[int, int] + if err := json.Unmarshal(data, &got); err != nil { + t.Fatal(err) + } + diff := cmp.Diff(fs, got, cmpopts.IgnoreUnexported(flowState[int, int]{})) + if diff != "" { + t.Errorf("mismatch (-want, +got):\n%s", diff) + } +} diff --git a/go/genkit/gen.go b/go/genkit/gen.go index 153ef1620b..402be5584a 100644 --- a/go/genkit/gen.go +++ b/go/genkit/gen.go @@ -23,11 +23,7 @@ type flowError struct { Stacktrace string `json:"stacktrace,omitempty"` } -type FlowInvokeEnvelopeMessageRunScheduled struct { - FlowID string `json:"flowId,omitempty"` -} - -type FlowExecution struct { +type flowExecution struct { // end time in milliseconds since the epoch EndTime gtime.Milliseconds `json:"endTime,omitempty"` // start time in milliseconds since the epoch @@ -35,8 +31,8 @@ type FlowExecution struct { TraceIDs []string `json:"traceIds,omitempty"` } -// BlockedOnStep describes the step of the flow that the flow is blocked on. -type BlockedOnStep struct { +// blockedOnStep describes the step of the flow that the flow is blocked on. +type blockedOnStep struct { Name string `json:"name,omitempty"` Schema string `json:"schema,omitempty"` } diff --git a/go/genkit/logging.go b/go/genkit/logging.go index 1b1dc9a3ff..1edbafaa70 100644 --- a/go/genkit/logging.go +++ b/go/genkit/logging.go @@ -31,16 +31,11 @@ func init() { var loggerKey = newContextKey[*slog.Logger]() -// logger returns the Logger in ctx, or the default Logger +// Logger returns the Logger in ctx, or the default Logger // if there is none. -func logger(ctx context.Context) *slog.Logger { +func Logger(ctx context.Context) *slog.Logger { if l := loggerKey.fromContext(ctx); l != nil { return l } return slog.Default() } - -// DebugLog is a helper function for plugins to log debugging info. -func DebugLog(ctx context.Context, msg string, args ...any) { - logger(ctx).Debug(msg, args...) -} diff --git a/go/genkit/metrics.go b/go/genkit/metrics.go index ef79205367..66861d1d28 100644 --- a/go/genkit/metrics.go +++ b/go/genkit/metrics.go @@ -37,7 +37,7 @@ var fetchInstruments = sync.OnceValue(func() *metricInstruments { insts, err := initInstruments() if err != nil { // Do not stop the program because we can't collect metrics. - logger(context.Background()).Error("metric initialization failed; no metrics will be collected", "err", err) + Logger(context.Background()).Error("metric initialization failed; no metrics will be collected", "err", err) return nil } return insts diff --git a/go/genkit/registry.go b/go/genkit/registry.go index 94a92ea341..bb755f38b6 100644 --- a/go/genkit/registry.go +++ b/go/genkit/registry.go @@ -67,7 +67,7 @@ func newRegistry() (*registry, error) { return r, nil } -// An Environment is the execution context that the program is running in. +// An Environment is the execution context in which the program is running. type Environment string const ( diff --git a/go/genkit/schemas.config b/go/genkit/schemas.config index bad4ecf1d0..c8a4094350 100644 --- a/go/genkit/schemas.config +++ b/go/genkit/schemas.config @@ -46,9 +46,9 @@ GenerationCommonConfig.topK type int GenerateRequestOutputFormat name OutputFormat -OperationBlockedOnStep name BlockedOnStep +OperationBlockedOnStep name blockedOnStep OperationBlockedOnStep doc -BlockedOnStep describes the step of the flow that the flow is blocked on. +blockedOnStep describes the step of the flow that the flow is blocked on. . OutputFormatJson name OutputFormatJSON @@ -66,9 +66,10 @@ FlowInvokeEnvelopeMessageRetry omit FlowInvokeEnvelopeMessageSchedule omit FlowInvokeEnvelopeMessageStart omit FlowInvokeEnvelopeMessageState omit +FlowInvokeEnvelopeMessageRunScheduled omit Operation omit -FlowStateExecution name FlowExecution +FlowStateExecution name flowExecution FlowStateExecution.startTime type gtime.Milliseconds FlowStateExecution.endTime type gtime.Milliseconds diff --git a/go/genkit/tracing.go b/go/genkit/tracing.go index a1facf631b..94b585667f 100644 --- a/go/genkit/tracing.go +++ b/go/genkit/tracing.go @@ -103,7 +103,7 @@ func runInNewSpan[I, O any]( f func(context.Context, I) (O, error), ) (O, error) { // TODO(jba): support span links. - log := logger(ctx) + log := Logger(ctx) log.Debug("span start", "name", name) defer log.Debug("span end", "name", name) sm := &spanMetadata{ diff --git a/go/plugins/localvec/localvec.go b/go/plugins/localvec/localvec.go index ad6faa8144..168e02bc51 100644 --- a/go/plugins/localvec/localvec.go +++ b/go/plugins/localvec/localvec.go @@ -110,7 +110,7 @@ func (r *retriever) Index(ctx context.Context, req *ai.IndexerRequest) error { } if _, ok := r.data[id]; ok { - genkit.DebugLog(ctx, "localvec skipping document because already present", "id", id) + genkit.Logger(ctx).Debug("localvec skipping document because already present", "id", id) continue } From f486aaea1d9741347307161e95514204da18b419 Mon Sep 17 00:00:00 2001 From: Jonathan Amsterdam Date: Wed, 22 May 2024 08:13:14 -0400 Subject: [PATCH 2/4] replace blockOnStep with an anonymous struct --- go/genkit/flow.go | 7 +++++-- go/genkit/flow_test.go | 11 +++++++---- go/genkit/gen.go | 6 ------ go/genkit/schemas.config | 5 +---- 4 files changed, 13 insertions(+), 16 deletions(-) diff --git a/go/genkit/flow.go b/go/genkit/flow.go index 339cc194b9..328f7b3f3c 100644 --- a/go/genkit/flow.go +++ b/go/genkit/flow.go @@ -210,8 +210,11 @@ func (fs *flowState[I, O]) cache() map[string]json.RawMessage { return fs.Cache // An Operation describes the state of a Flow that may still be in progress. type Operation[O any] struct { - FlowID string `json:"name,omitempty"` - BlockedOnStep *blockedOnStep `json:"blockedOnStep,omitempty"` + FlowID string `json:"name,omitempty"` + BlockedOnStep *struct { + Name string `json:"name"` + Schema string `json:"schema"` + } `json:"blockedOnStep,omitempty"` // Whether the operation is completed. // If true Result will be non-nil. Done bool `json:"done,omitempty"` diff --git a/go/genkit/flow_test.go b/go/genkit/flow_test.go index cfffbbbc69..f5153311cc 100644 --- a/go/genkit/flow_test.go +++ b/go/genkit/flow_test.go @@ -138,10 +138,13 @@ func TestFlowState(t *testing.T) { EventsTriggered: map[string]any{"a": "b"}, Executions: []*flowExecution{{StartTime: 4, EndTime: 5, TraceIDs: []string{"c"}}}, Operation: &Operation[int]{ - FlowID: "id", - BlockedOnStep: &blockedOnStep{Name: "bos", Schema: "s"}, - Done: true, - Metadata: "meta", + FlowID: "id", + BlockedOnStep: &struct { + Name string `json:"name"` + Schema string `json:"schema"` + }{Name: "bos", Schema: "s"}, + Done: true, + Metadata: "meta", Result: &FlowResult[int]{ Response: 6, Error: "err", diff --git a/go/genkit/gen.go b/go/genkit/gen.go index 402be5584a..2a64ee8c10 100644 --- a/go/genkit/gen.go +++ b/go/genkit/gen.go @@ -30,9 +30,3 @@ type flowExecution struct { StartTime gtime.Milliseconds `json:"startTime,omitempty"` TraceIDs []string `json:"traceIds,omitempty"` } - -// blockedOnStep describes the step of the flow that the flow is blocked on. -type blockedOnStep struct { - Name string `json:"name,omitempty"` - Schema string `json:"schema,omitempty"` -} diff --git a/go/genkit/schemas.config b/go/genkit/schemas.config index c8a4094350..5152ff3a47 100644 --- a/go/genkit/schemas.config +++ b/go/genkit/schemas.config @@ -46,10 +46,7 @@ GenerationCommonConfig.topK type int GenerateRequestOutputFormat name OutputFormat -OperationBlockedOnStep name blockedOnStep -OperationBlockedOnStep doc -blockedOnStep describes the step of the flow that the flow is blocked on. -. +OperationBlockedOnStep omit OutputFormatJson name OutputFormatJSON From bda8eb114577f6f87d2407f1b6c765156b8696c4 Mon Sep 17 00:00:00 2001 From: Jonathan Amsterdam Date: Wed, 22 May 2024 08:16:24 -0400 Subject: [PATCH 3/4] add doc --- go/genkit/flow.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/go/genkit/flow.go b/go/genkit/flow.go index 328f7b3f3c..5bbaa4fa04 100644 --- a/go/genkit/flow.go +++ b/go/genkit/flow.go @@ -210,7 +210,8 @@ func (fs *flowState[I, O]) cache() map[string]json.RawMessage { return fs.Cache // An Operation describes the state of a Flow that may still be in progress. type Operation[O any] struct { - FlowID string `json:"name,omitempty"` + FlowID string `json:"name,omitempty"` + // The step that the flow is blocked on, if any. BlockedOnStep *struct { Name string `json:"name"` Schema string `json:"schema"` From 8f281e7e91793731780ec52229caef0c031b0baa Mon Sep 17 00:00:00 2001 From: Jonathan Amsterdam Date: Wed, 22 May 2024 08:32:39 -0400 Subject: [PATCH 4/4] unexport Operation --- go/genkit/flow.go | 10 +++++----- go/genkit/flow_test.go | 6 +++--- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/go/genkit/flow.go b/go/genkit/flow.go index 5bbaa4fa04..5839e5b6a9 100644 --- a/go/genkit/flow.go +++ b/go/genkit/flow.go @@ -176,7 +176,7 @@ type flowState[I, O any] struct { EventsTriggered map[string]any `json:"eventsTriggered,omitempty"` Executions []*flowExecution `json:"executions,omitempty"` // The operation is the user-visible part of the state. - Operation *Operation[O] `json:"operation,omitempty"` + Operation *operation[O] `json:"operation,omitempty"` TraceContext string `json:"traceContext,omitempty"` } @@ -187,7 +187,7 @@ func newFlowState[I, O any](id, name string, input I) *flowState[I, O] { Input: input, StartTime: gtime.ToMilliseconds(time.Now()), Cache: map[string]json.RawMessage{}, - Operation: &Operation[O]{ + Operation: &operation[O]{ FlowID: id, Done: false, }, @@ -208,8 +208,8 @@ func (fs *flowState[I, O]) lock() { fs.mu.Lock() } func (fs *flowState[I, O]) unlock() { fs.mu.Unlock() } func (fs *flowState[I, O]) cache() map[string]json.RawMessage { return fs.Cache } -// An Operation describes the state of a Flow that may still be in progress. -type Operation[O any] struct { +// An operation describes the state of a Flow that may still be in progress. +type operation[O any] struct { FlowID string `json:"name,omitempty"` // The step that the flow is blocked on, if any. BlockedOnStep *struct { @@ -524,7 +524,7 @@ func StreamFlow[I, O, S any](ctx context.Context, flow *Flow[I, O, S], input I) } } -func finishedOpResponse[O any](op *Operation[O]) (O, error) { +func finishedOpResponse[O any](op *operation[O]) (O, error) { if !op.Done { return internal.Zero[O](), fmt.Errorf("flow %s did not finish execution", op.FlowID) } diff --git a/go/genkit/flow_test.go b/go/genkit/flow_test.go index f5153311cc..0d24b134ce 100644 --- a/go/genkit/flow_test.go +++ b/go/genkit/flow_test.go @@ -40,13 +40,13 @@ func TestFlowStart(t *testing.T) { t.Fatal(err) } got := state.Operation - want := &Operation[int]{ + want := &operation[int]{ Done: true, Result: &FlowResult[int]{ Response: 2, }, } - if diff := cmp.Diff(want, got, cmpopts.IgnoreFields(Operation[int]{}, "FlowID")); diff != "" { + if diff := cmp.Diff(want, got, cmpopts.IgnoreFields(operation[int]{}, "FlowID")); diff != "" { t.Errorf("mismatch (-want, +got):\n%s", diff) } } @@ -137,7 +137,7 @@ func TestFlowState(t *testing.T) { Cache: map[string]json.RawMessage{"x": json.RawMessage([]byte("3"))}, EventsTriggered: map[string]any{"a": "b"}, Executions: []*flowExecution{{StartTime: 4, EndTime: 5, TraceIDs: []string{"c"}}}, - Operation: &Operation[int]{ + Operation: &operation[int]{ FlowID: "id", BlockedOnStep: &struct { Name string `json:"name"`