diff --git a/docs-go/flows.md b/docs-go/flows.md index 8f97f1ff5a..107520f674 100644 --- a/docs-go/flows.md +++ b/docs-go/flows.md @@ -62,7 +62,7 @@ To run a flow in your code: - {Go} ```go - suggestion, err := genkit.RunFlow(context.Background(), menuSuggestionFlow, "French") + suggestion, err := menuSuggestionFlow.Run(context.Background(), "French") ``` You can use the CLI to run flows as well: @@ -118,11 +118,10 @@ To invoke a flow in streaming mode: - {Go} ```go - genkit.StreamFlow( + menuSuggestionFlow.Stream( context.Background(), - menuSuggestionFlow, "French", - )(func(sfv *genkit.StreamFlowValue[OutputType, StreamType], err error) bool { + )(func(sfv *core.StreamFlowValue[OutputType, StreamType], err error) bool { if !sfv.Done { fmt.Print(sfv.Output) return true diff --git a/go/core/flow.go b/go/core/flow.go index 73c9a4e7a9..cb407203a5 100644 --- a/go/core/flow.go +++ b/go/core/flow.go @@ -538,29 +538,63 @@ func InternalRun[Out any](ctx context.Context, name string, f func() (Out, error }) } -// InternalRunFlow is for use by genkit.RunFlow exclusively. -// It is not subject to any backwards compatibility guarantees. -func InternalRunFlow[In, Out, Stream any](ctx context.Context, flow *Flow[In, Out, Stream], input In) (Out, error) { - state, err := flow.start(ctx, input, nil) +// Run runs the flow in the context of another flow. The flow must run to completion when started +// (that is, it must not have interrupts). +func (f *Flow[In, Out, Stream]) Run(ctx context.Context, input In) (Out, error) { + return f.run(ctx, input, nil) +} + +func (f *Flow[In, Out, Stream]) run(ctx context.Context, input In, cb func(context.Context, Stream) error) (Out, error) { + state, err := f.start(ctx, input, cb) if err != nil { return internal.Zero[Out](), err } return finishedOpResponse(state.Operation) } -// InternalStreamFlow is for use by genkit.StreamFlow exclusively. -// It is not subject to any backwards compatibility guarantees. -func InternalStreamFlow[In, Out, Stream any](ctx context.Context, flow *Flow[In, Out, Stream], input In, callback func(context.Context, Stream) error) (Out, error) { - state, err := flow.start(ctx, input, callback) - if err != nil { - return internal.Zero[Out](), err - } - if ctx.Err() != nil { - return internal.Zero[Out](), ctx.Err() +// StreamFlowValue is either a streamed value or a final output of a flow. +type StreamFlowValue[Out, Stream any] struct { + Done bool + Output Out // valid if Done is true + Stream Stream // valid if Done is false +} + +// Stream runs the flow on input and delivers both the streamed values and the final output. +// It returns a function whose argument function (the "yield function") will be repeatedly +// called with the results. +// +// If the yield function is passed a non-nil error, the flow has failed with that +// error; the yield function will not be called again. An error is also passed if +// the flow fails to complete (that is, it has an interrupt). +// Genkit Go does not yet support interrupts. +// +// If the yield function's [StreamFlowValue] argument has Done == true, the value's +// Output field contains the final output; the yield function will not be called +// again. +// +// Otherwise the Stream field of the passed [StreamFlowValue] holds a streamed result. +func (f *Flow[In, Out, Stream]) Stream(ctx context.Context, input In) func(func(*StreamFlowValue[Out, Stream], error) bool) { + return func(yield func(*StreamFlowValue[Out, Stream], error) bool) { + cb := func(ctx context.Context, s Stream) error { + if ctx.Err() != nil { + return ctx.Err() + } + if !yield(&StreamFlowValue[Out, Stream]{Stream: s}, nil) { + return errStop + } + return nil + } + output, err := f.run(ctx, input, cb) + if err != nil { + yield(nil, err) + } else { + yield(&StreamFlowValue[Out, Stream]{Done: true, Output: output}, nil) + } } - return finishedOpResponse(state.Operation) } +var errStop = errors.New("stop") + func finishedOpResponse[O any](op *operation[O]) (O, error) { if !op.Done { return internal.Zero[O](), fmt.Errorf("flow %s did not finish execution", op.FlowID) diff --git a/go/core/flow_test.go b/go/core/flow_test.go index 95bb89c6ac..843da267a4 100644 --- a/go/core/flow_test.go +++ b/go/core/flow_test.go @@ -94,7 +94,7 @@ func TestRunFlow(t *testing.T) { t.Fatal(err) } f := defineFlow(reg, "inc", incFlow) - got, err := InternalRunFlow(context.Background(), f, 2) + got, err := f.Run(context.Background(), 2) if err != nil { t.Fatal(err) } diff --git a/go/genkit/genkit.go b/go/genkit/genkit.go index 50a162c083..467b5e9d7f 100644 --- a/go/genkit/genkit.go +++ b/go/genkit/genkit.go @@ -17,7 +17,6 @@ package genkit import ( "context" - "errors" "net/http" "github.com/firebase/genkit/go/core" @@ -94,55 +93,6 @@ func Run[Out any](ctx context.Context, name string, f func() (Out, error)) (Out, return core.InternalRun(ctx, name, f) } -// RunFlow runs flow in the context of another flow. The flow must run to completion when started -// (that is, it must not have interrupts). -func RunFlow[In, Out, Stream any](ctx context.Context, flow *core.Flow[In, Out, Stream], input In) (Out, error) { - return core.InternalRunFlow(ctx, flow, input) -} - -// StreamFlowValue is either a streamed value or a final output of a flow. -type StreamFlowValue[Out, Stream any] struct { - Done bool - Output Out // valid if Done is true - Stream Stream // valid if Done is false -} - -// StreamFlow runs flow on input and delivers both the streamed values and the final output. -// It returns a function whose argument function (the "yield function") will be repeatedly -// called with the results. -// -// If the yield function is passed a non-nil error, the flow has failed with that -// error; the yield function will not be called again. An error is also passed if -// the flow fails to complete (that is, it has an interrupt). -// Genkit Go does not yet support interrupts. -// -// If the yield function's [StreamFlowValue] argument has Done == true, the value's -// Output field contains the final output; the yield function will not be called -// again. -// -// Otherwise the Stream field of the passed [StreamFlowValue] holds a streamed result. -func StreamFlow[In, Out, Stream any](ctx context.Context, flow *core.Flow[In, Out, Stream], input In) func(func(*StreamFlowValue[Out, Stream], error) bool) { - return func(yield func(*StreamFlowValue[Out, Stream], error) bool) { - cb := func(ctx context.Context, s Stream) error { - if ctx.Err() != nil { - return ctx.Err() - } - if !yield(&StreamFlowValue[Out, Stream]{Stream: s}, nil) { - return errStop - } - return nil - } - output, err := core.InternalStreamFlow(ctx, flow, input, cb) - if err != nil { - yield(nil, err) - } else { - yield(&StreamFlowValue[Out, Stream]{Done: true, Output: output}, nil) - } - } -} - -var errStop = errors.New("stop") - // StartFlowServer starts a server serving the routes described in [NewFlowServeMux]. // It listens on addr, or if empty, the value of the PORT environment variable, // or if that is empty, ":3400". diff --git a/go/genkit/genkit_test.go b/go/genkit/genkit_test.go index f6d92f04f8..6cedc14093 100644 --- a/go/genkit/genkit_test.go +++ b/go/genkit/genkit_test.go @@ -17,13 +17,15 @@ package genkit import ( "context" "testing" + + "github.com/firebase/genkit/go/core" ) func TestStreamFlow(t *testing.T) { f := DefineStreamingFlow("count", count) - iter := StreamFlow(context.Background(), f, 2) + iter := f.Stream(context.Background(), 2) want := 0 - iter(func(val *StreamFlowValue[int, int], err error) bool { + iter(func(val *core.StreamFlowValue[int, int], err error) bool { if err != nil { t.Fatal(err) } diff --git a/go/internal/doc-snippets/flows.go b/go/internal/doc-snippets/flows.go index 134ed70f42..32bca2c3cb 100644 --- a/go/internal/doc-snippets/flows.go +++ b/go/internal/doc-snippets/flows.go @@ -21,6 +21,7 @@ import ( "net/http" "strings" + "github.com/firebase/genkit/go/core" "github.com/firebase/genkit/go/genkit" ) @@ -59,7 +60,7 @@ func f2() { ) // !-flow2 // !+run1 - suggestion, err := genkit.RunFlow(context.Background(), menuSuggestionFlow, "French") + suggestion, err := menuSuggestionFlow.Run(context.Background(), "French") // !-run1 _ = suggestion _ = err @@ -101,11 +102,10 @@ func f3() { // !-streaming // !+invoke-streaming - genkit.StreamFlow( + menuSuggestionFlow.Stream( context.Background(), - menuSuggestionFlow, "French", - )(func(sfv *genkit.StreamFlowValue[OutputType, StreamType], err error) bool { + )(func(sfv *core.StreamFlowValue[OutputType, StreamType], err error) bool { if !sfv.Done { fmt.Print(sfv.Output) return true diff --git a/go/samples/coffee-shop/main.go b/go/samples/coffee-shop/main.go index 82d5cb08da..d700d1b0c3 100755 --- a/go/samples/coffee-shop/main.go +++ b/go/samples/coffee-shop/main.go @@ -237,7 +237,7 @@ func main() { }) genkit.DefineFlow("testAllCoffeeFlows", func(ctx context.Context, _ struct{}) (*testAllCoffeeFlowsOutput, error) { - test1, err := genkit.RunFlow(ctx, simpleGreetingFlow, &simpleGreetingInput{ + test1, err := simpleGreetingFlow.Run(ctx, &simpleGreetingInput{ CustomerName: "Sam", }) if err != nil { @@ -247,7 +247,7 @@ func main() { } return out, nil } - test2, err := genkit.RunFlow(ctx, greetingWithHistoryFlow, &customerTimeAndHistoryInput{ + test2, err := greetingWithHistoryFlow.Run(ctx, &customerTimeAndHistoryInput{ CustomerName: "Sam", CurrentTime: "09:45am", PreviousOrder: "Caramel Macchiato", diff --git a/go/samples/flow-sample1/main.go b/go/samples/flow-sample1/main.go index 14490a7b38..3317a7726d 100644 --- a/go/samples/flow-sample1/main.go +++ b/go/samples/flow-sample1/main.go @@ -51,7 +51,7 @@ func main() { }) genkit.DefineFlow("parent", func(ctx context.Context, _ struct{}) (string, error) { - return genkit.RunFlow(ctx, basic, "foo") + return basic.Run(ctx, "foo") }) type complex struct { diff --git a/go/samples/menu/s05.go b/go/samples/menu/s05.go index d124f44ea9..3d80c9dc2e 100644 --- a/go/samples/menu/s05.go +++ b/go/samples/menu/s05.go @@ -125,7 +125,7 @@ func setup05(ctx context.Context, gen, genVision *ai.Model) error { genkit.DefineFlow("s05_visionMenuQuestion", func(ctx context.Context, input *menuQuestionInput) (*answerOutput, error) { - menuText, err := genkit.RunFlow(ctx, readMenuFlow, struct{}{}) + menuText, err := readMenuFlow.Run(ctx, struct{}{}) if err != nil { return nil, err } @@ -134,7 +134,7 @@ func setup05(ctx context.Context, gen, genVision *ai.Model) error { MenuText: menuText, Question: input.Question, } - return genkit.RunFlow(ctx, textMenuQuestionFlow, questionInput) + return textMenuQuestionFlow.Run(ctx, questionInput) }, )