Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions go/genkit/action.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,11 +96,11 @@ func (a *Action[I, O, S]) setTracingState(tstate *tracingState) { a.tstate = tst
func (a *Action[I, O, S]) Run(ctx context.Context, input I, cb StreamingCallback[S]) (output O, err error) {
// TODO: validate input against JSONSchema for I.
// TODO: validate output against JSONSchema for O.
logger(ctx).Debug("Action.Run",
Logger(ctx).Debug("Action.Run",
"name", a.name,
"input", fmt.Sprintf("%#v", input))
defer func() {
logger(ctx).Debug("Action.Run",
Logger(ctx).Debug("Action.Run",
"name", a.name,
"output", fmt.Sprintf("%#v", output),
"err", err)
Expand Down
4 changes: 2 additions & 2 deletions go/genkit/dev_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ func (s *devServer) handleRunAction(w http.ResponseWriter, r *http.Request) erro
return err
}
}
logger(ctx).Debug("running action",
Logger(ctx).Debug("running action",
"key", body.Key,
"stream", stream)
var callback StreamingCallback[json.RawMessage]
Expand Down Expand Up @@ -286,7 +286,7 @@ func writeJSON(ctx context.Context, w http.ResponseWriter, value any) error {
}
_, err = w.Write(data)
if err != nil {
logger(ctx).Error("writing output", "err", err)
Logger(ctx).Error("writing output", "err", err)
}
return nil
}
56 changes: 32 additions & 24 deletions go/genkit/flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,15 @@ import (
// TODO(jba): provide a way to start a Flow from user code.

// A Flow is a kind of Action that can be interrupted and resumed.
// (Resumption is an experimental feature in the Javascript implementation,
// and not yet supported in Go.)
//
// A Flow[I, O, S] represents a function from I to O (the S parameter is described
// under "Streaming" below). But the function may run in pieces, with interruptions
// and resumptions. (The interruptions discussed here are a part of the flow mechanism,
// not hardware interrupts.) The actual Go function for the flow may be executed
// multiple times, each time making more progress, until finally it completes with
// a value of type O or an error. The mechanism used to achieve this is explained
// below.
// A Flow[I, O, S] represents a function from I to O (the S parameter is for streaming,
// described below). But the function may run in pieces, with interruptions and resumptions.
// (The interruptions discussed here are a part of the flow mechanism, not hardware
// interrupts.) The actual Go function for the flow may be executed multiple times,
// each time making more progress, until finally it completes with a value of type
// O or an error. The mechanism used to achieve this is explained below.
//
// To treat a flow as an action, which is an uninterrupted function execution, we
// use different input and output types to capture the additional behavior. The input
Expand All @@ -60,8 +61,8 @@ import (
// are cached in the flowState. The programmer marks these steps manually, by calling
// genkit.Run.
//
// A flow computation consists of one or more flow executions. (The FlowExecution
// type records information about these; a flowState holds a slice of FlowExecutions.)
// A flow computation consists of one or more flow executions. (The flowExecution
// type records information about these; a flowState holds a slice of flowExecutions.)
// The computation begins with a "start" instruction. If the function is not interrupted,
// it will run to completion and the final state will contain its result. If it is
// interrupted, state will contain information about how and when it can be resumed.
Expand All @@ -72,8 +73,6 @@ import (
// "schedule" instruction accomplishes this; the flow is finally started at a later
// time by the "runScheduled" instruction.
//
// # Streaming
//
// Some flows can "stream" their results, providing them incrementally. To do so,
// the flow invokes a callback repeatedly. When streaming is complete, the flow
// returns a final result in the usual way.
Expand All @@ -82,6 +81,10 @@ import (
//
// Streaming is only supported for the "start" flow instruction. Currently there is
// no way to schedule or resume a flow with streaming.

// A Flow is an Action with additional support for observability and introspection.
// A Flow[I, O, S] represents a function from I to O. The S parameter is for
// flows that support streaming: providing their results incrementally.
type Flow[I, O, S any] struct {
name string // The last component of the flow's key in the registry.
fn Func[I, O, S] // The function to run.
Expand Down Expand Up @@ -171,9 +174,9 @@ type flowState[I, O any] struct {
mu sync.Mutex
Cache map[string]json.RawMessage `json:"cache,omitempty"`
EventsTriggered map[string]any `json:"eventsTriggered,omitempty"`
Executions []*FlowExecution `json:"executions,omitempty"`
Executions []*flowExecution `json:"executions,omitempty"`
// The operation is the user-visible part of the state.
Operation *Operation[O] `json:"operation,omitempty"`
Operation *operation[O] `json:"operation,omitempty"`
TraceContext string `json:"traceContext,omitempty"`
}

Expand All @@ -184,7 +187,7 @@ func newFlowState[I, O any](id, name string, input I) *flowState[I, O] {
Input: input,
StartTime: gtime.ToMilliseconds(time.Now()),
Cache: map[string]json.RawMessage{},
Operation: &Operation[O]{
Operation: &operation[O]{
FlowID: id,
Done: false,
},
Expand All @@ -205,10 +208,14 @@ func (fs *flowState[I, O]) lock() { fs.mu.Lock() }
func (fs *flowState[I, O]) unlock() { fs.mu.Unlock() }
func (fs *flowState[I, O]) cache() map[string]json.RawMessage { return fs.Cache }

// An Operation describes the state of a Flow that may still be in progress.
type Operation[O any] struct {
FlowID string `json:"name,omitempty"`
BlockedOnStep *BlockedOnStep `json:"blockedOnStep,omitempty"`
// An operation describes the state of a Flow that may still be in progress.
type operation[O any] struct {
FlowID string `json:"name,omitempty"`
// The step that the flow is blocked on, if any.
BlockedOnStep *struct {
Name string `json:"name"`
Schema string `json:"schema"`
} `json:"blockedOnStep,omitempty"`
// Whether the operation is completed.
// If true Result will be non-nil.
Done bool `json:"done,omitempty"`
Expand All @@ -220,14 +227,15 @@ type Operation[O any] struct {
// A FlowResult is the result of a flow: either success, in which case Response is
// the return value of the flow's function; or failure, in which case Error is the
// non-empty error string.
// (Called FlowResponse in the javascript.)
type FlowResult[O any] struct {
Response O `json:"response,omitempty"`
Error string `json:"error,omitempty"`
// TODO(jba): keep the actual error around so that RunFlow can use it.
StackTrace string `json:"stacktrace,omitempty"`
}

// FlowResult is called FlowResponse in the javascript.

// action creates an action for the flow. See the comment at the top of this file for more information.
func (f *Flow[I, O, S]) action() *Action[*flowInstruction[I], *flowState[I, O], S] {
return NewStreamingAction(f.name, nil, func(ctx context.Context, inst *flowInstruction[I], cb StreamingCallback[S]) (*flowState[I, O], error) {
Expand Down Expand Up @@ -281,11 +289,11 @@ func (f *Flow[I, O, S]) execute(ctx context.Context, state *flowState[I, O], dis
defer func() {
if err := fctx.finish(ctx); err != nil {
// TODO(jba): do something more with this error?
logger(ctx).Error("flowContext.finish", "err", err.Error())
Logger(ctx).Error("flowContext.finish", "err", err.Error())
}
}()
ctx = flowContextKey.newContext(ctx, fctx)
exec := &FlowExecution{
exec := &flowExecution{
StartTime: gtime.ToMilliseconds(time.Now()),
}
state.mu.Lock()
Expand All @@ -310,14 +318,14 @@ func (f *Flow[I, O, S]) execute(ctx context.Context, state *flowState[I, O], dis
latency := time.Since(start)
if err != nil {
// TODO(jba): handle InterruptError
logger(ctx).Error("flow failed",
Logger(ctx).Error("flow failed",
"path", spanMeta.Path,
"err", err.Error(),
)
writeFlowFailure(ctx, f.name, latency, err)
spanMeta.SetAttr("flow:state", "error")
} else {
logger(ctx).Info("flow succeeded", "path", spanMeta.Path)
Logger(ctx).Info("flow succeeded", "path", spanMeta.Path)
writeFlowSuccess(ctx, f.name, latency)
spanMeta.SetAttr("flow:state", "done")

Expand Down Expand Up @@ -516,7 +524,7 @@ func StreamFlow[I, O, S any](ctx context.Context, flow *Flow[I, O, S], input I)
}
}

func finishedOpResponse[O any](op *Operation[O]) (O, error) {
func finishedOpResponse[O any](op *operation[O]) (O, error) {
if !op.Done {
return internal.Zero[O](), fmt.Errorf("flow %s did not finish execution", op.FlowID)
}
Expand Down
47 changes: 45 additions & 2 deletions go/genkit/flow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package genkit

import (
"context"
"encoding/json"
"slices"
"testing"

Expand All @@ -39,13 +40,13 @@ func TestFlowStart(t *testing.T) {
t.Fatal(err)
}
got := state.Operation
want := &Operation[int]{
want := &operation[int]{
Done: true,
Result: &FlowResult[int]{
Response: 2,
},
}
if diff := cmp.Diff(want, got, cmpopts.IgnoreFields(Operation[int]{}, "FlowID")); diff != "" {
if diff := cmp.Diff(want, got, cmpopts.IgnoreFields(operation[int]{}, "FlowID")); diff != "" {
t.Errorf("mismatch (-want, +got):\n%s", diff)
}
}
Expand Down Expand Up @@ -123,3 +124,45 @@ func TestStreamFlow(t *testing.T) {
return true
})
}

func TestFlowState(t *testing.T) {
// A flowState is an action output, so it must support JSON marshaling.
// Verify that a fully populated flowState can round-trip via JSON.

fs := &flowState[int, int]{
FlowID: "id",
FlowName: "name",
StartTime: 1,
Input: 2,
Cache: map[string]json.RawMessage{"x": json.RawMessage([]byte("3"))},
EventsTriggered: map[string]any{"a": "b"},
Executions: []*flowExecution{{StartTime: 4, EndTime: 5, TraceIDs: []string{"c"}}},
Operation: &operation[int]{
FlowID: "id",
BlockedOnStep: &struct {
Name string `json:"name"`
Schema string `json:"schema"`
}{Name: "bos", Schema: "s"},
Done: true,
Metadata: "meta",
Result: &FlowResult[int]{
Response: 6,
Error: "err",
StackTrace: "st",
},
},
TraceContext: "tc",
}
data, err := json.Marshal(fs)
if err != nil {
t.Fatal(err)
}
var got *flowState[int, int]
if err := json.Unmarshal(data, &got); err != nil {
t.Fatal(err)
}
diff := cmp.Diff(fs, got, cmpopts.IgnoreUnexported(flowState[int, int]{}))
if diff != "" {
t.Errorf("mismatch (-want, +got):\n%s", diff)
}
}
12 changes: 1 addition & 11 deletions go/genkit/gen.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,20 +23,10 @@ type flowError struct {
Stacktrace string `json:"stacktrace,omitempty"`
}

type FlowInvokeEnvelopeMessageRunScheduled struct {
FlowID string `json:"flowId,omitempty"`
}

type FlowExecution struct {
type flowExecution struct {
// end time in milliseconds since the epoch
EndTime gtime.Milliseconds `json:"endTime,omitempty"`
// start time in milliseconds since the epoch
StartTime gtime.Milliseconds `json:"startTime,omitempty"`
TraceIDs []string `json:"traceIds,omitempty"`
}

// BlockedOnStep describes the step of the flow that the flow is blocked on.
type BlockedOnStep struct {
Name string `json:"name,omitempty"`
Schema string `json:"schema,omitempty"`
}
9 changes: 2 additions & 7 deletions go/genkit/logging.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,16 +31,11 @@ func init() {

var loggerKey = newContextKey[*slog.Logger]()

// logger returns the Logger in ctx, or the default Logger
// Logger returns the Logger in ctx, or the default Logger
// if there is none.
func logger(ctx context.Context) *slog.Logger {
func Logger(ctx context.Context) *slog.Logger {
if l := loggerKey.fromContext(ctx); l != nil {
return l
}
return slog.Default()
}

// DebugLog is a helper function for plugins to log debugging info.
func DebugLog(ctx context.Context, msg string, args ...any) {
logger(ctx).Debug(msg, args...)
}
2 changes: 1 addition & 1 deletion go/genkit/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ var fetchInstruments = sync.OnceValue(func() *metricInstruments {
insts, err := initInstruments()
if err != nil {
// Do not stop the program because we can't collect metrics.
logger(context.Background()).Error("metric initialization failed; no metrics will be collected", "err", err)
Logger(context.Background()).Error("metric initialization failed; no metrics will be collected", "err", err)
return nil
}
return insts
Expand Down
2 changes: 1 addition & 1 deletion go/genkit/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func newRegistry() (*registry, error) {
return r, nil
}

// An Environment is the execution context that the program is running in.
// An Environment is the execution context in which the program is running.
type Environment string

const (
Expand Down
8 changes: 3 additions & 5 deletions go/genkit/schemas.config
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,7 @@ GenerationCommonConfig.topK type int

GenerateRequestOutputFormat name OutputFormat

OperationBlockedOnStep name BlockedOnStep
OperationBlockedOnStep doc
BlockedOnStep describes the step of the flow that the flow is blocked on.
.
OperationBlockedOnStep omit

OutputFormatJson name OutputFormatJSON

Expand All @@ -66,9 +63,10 @@ FlowInvokeEnvelopeMessageRetry omit
FlowInvokeEnvelopeMessageSchedule omit
FlowInvokeEnvelopeMessageStart omit
FlowInvokeEnvelopeMessageState omit
FlowInvokeEnvelopeMessageRunScheduled omit
Operation omit

FlowStateExecution name FlowExecution
FlowStateExecution name flowExecution
FlowStateExecution.startTime type gtime.Milliseconds
FlowStateExecution.endTime type gtime.Milliseconds

Expand Down
2 changes: 1 addition & 1 deletion go/genkit/tracing.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ func runInNewSpan[I, O any](
f func(context.Context, I) (O, error),
) (O, error) {
// TODO(jba): support span links.
log := logger(ctx)
log := Logger(ctx)
log.Debug("span start", "name", name)
defer log.Debug("span end", "name", name)
sm := &spanMetadata{
Expand Down
2 changes: 1 addition & 1 deletion go/plugins/localvec/localvec.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ func (r *retriever) Index(ctx context.Context, req *ai.IndexerRequest) error {
}

if _, ok := r.data[id]; ok {
genkit.DebugLog(ctx, "localvec skipping document because already present", "id", id)
genkit.Logger(ctx).Debug("localvec skipping document because already present", "id", id)
continue
}

Expand Down