Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 3 additions & 4 deletions docs-go/flows.md
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ To run a flow in your code:
- {Go}

```go
suggestion, err := genkit.RunFlow(context.Background(), menuSuggestionFlow, "French")
suggestion, err := menuSuggestionFlow.Run(context.Background(), "French")
```

You can use the CLI to run flows as well:
Expand Down Expand Up @@ -118,11 +118,10 @@ To invoke a flow in streaming mode:
- {Go}

```go
genkit.StreamFlow(
menuSuggestionFlow.Stream(
context.Background(),
menuSuggestionFlow,
"French",
)(func(sfv *genkit.StreamFlowValue[OutputType, StreamType], err error) bool {
)(func(sfv *core.StreamFlowValue[OutputType, StreamType], err error) bool {
if !sfv.Done {
fmt.Print(sfv.Output)
return true
Expand Down
62 changes: 48 additions & 14 deletions go/core/flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -538,29 +538,63 @@ func InternalRun[Out any](ctx context.Context, name string, f func() (Out, error
})
}

// InternalRunFlow is for use by genkit.RunFlow exclusively.
// It is not subject to any backwards compatibility guarantees.
func InternalRunFlow[In, Out, Stream any](ctx context.Context, flow *Flow[In, Out, Stream], input In) (Out, error) {
state, err := flow.start(ctx, input, nil)
// Run runs the flow in the context of another flow. The flow must run to completion when started
// (that is, it must not have interrupts).
func (f *Flow[In, Out, Stream]) Run(ctx context.Context, input In) (Out, error) {
return f.run(ctx, input, nil)
}

func (f *Flow[In, Out, Stream]) run(ctx context.Context, input In, cb func(context.Context, Stream) error) (Out, error) {
state, err := f.start(ctx, input, cb)
if err != nil {
return internal.Zero[Out](), err
}
return finishedOpResponse(state.Operation)
}

// InternalStreamFlow is for use by genkit.StreamFlow exclusively.
// It is not subject to any backwards compatibility guarantees.
func InternalStreamFlow[In, Out, Stream any](ctx context.Context, flow *Flow[In, Out, Stream], input In, callback func(context.Context, Stream) error) (Out, error) {
state, err := flow.start(ctx, input, callback)
if err != nil {
return internal.Zero[Out](), err
}
if ctx.Err() != nil {
return internal.Zero[Out](), ctx.Err()
// StreamFlowValue is either a streamed value or a final output of a flow.
type StreamFlowValue[Out, Stream any] struct {
Done bool
Output Out // valid if Done is true
Stream Stream // valid if Done is false
}

// Stream runs the flow on input and delivers both the streamed values and the final output.
// It returns a function whose argument function (the "yield function") will be repeatedly
// called with the results.
//
// If the yield function is passed a non-nil error, the flow has failed with that
// error; the yield function will not be called again. An error is also passed if
// the flow fails to complete (that is, it has an interrupt).
// Genkit Go does not yet support interrupts.
//
// If the yield function's [StreamFlowValue] argument has Done == true, the value's
// Output field contains the final output; the yield function will not be called
// again.
//
// Otherwise the Stream field of the passed [StreamFlowValue] holds a streamed result.
func (f *Flow[In, Out, Stream]) Stream(ctx context.Context, input In) func(func(*StreamFlowValue[Out, Stream], error) bool) {
return func(yield func(*StreamFlowValue[Out, Stream], error) bool) {
cb := func(ctx context.Context, s Stream) error {
if ctx.Err() != nil {
return ctx.Err()
}
if !yield(&StreamFlowValue[Out, Stream]{Stream: s}, nil) {
return errStop
}
return nil
}
output, err := f.run(ctx, input, cb)
if err != nil {
yield(nil, err)
} else {
yield(&StreamFlowValue[Out, Stream]{Done: true, Output: output}, nil)
}
}
return finishedOpResponse(state.Operation)
}

var errStop = errors.New("stop")

func finishedOpResponse[O any](op *operation[O]) (O, error) {
if !op.Done {
return internal.Zero[O](), fmt.Errorf("flow %s did not finish execution", op.FlowID)
Expand Down
2 changes: 1 addition & 1 deletion go/core/flow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ func TestRunFlow(t *testing.T) {
t.Fatal(err)
}
f := defineFlow(reg, "inc", incFlow)
got, err := InternalRunFlow(context.Background(), f, 2)
got, err := f.Run(context.Background(), 2)
if err != nil {
t.Fatal(err)
}
Expand Down
50 changes: 0 additions & 50 deletions go/genkit/genkit.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ package genkit

import (
"context"
"errors"
"net/http"

"github.com/firebase/genkit/go/core"
Expand Down Expand Up @@ -94,55 +93,6 @@ func Run[Out any](ctx context.Context, name string, f func() (Out, error)) (Out,
return core.InternalRun(ctx, name, f)
}

// RunFlow runs flow in the context of another flow. The flow must run to completion when started
// (that is, it must not have interrupts).
func RunFlow[In, Out, Stream any](ctx context.Context, flow *core.Flow[In, Out, Stream], input In) (Out, error) {
return core.InternalRunFlow(ctx, flow, input)
}

// StreamFlowValue is either a streamed value or a final output of a flow.
type StreamFlowValue[Out, Stream any] struct {
Done bool
Output Out // valid if Done is true
Stream Stream // valid if Done is false
}

// StreamFlow runs flow on input and delivers both the streamed values and the final output.
// It returns a function whose argument function (the "yield function") will be repeatedly
// called with the results.
//
// If the yield function is passed a non-nil error, the flow has failed with that
// error; the yield function will not be called again. An error is also passed if
// the flow fails to complete (that is, it has an interrupt).
// Genkit Go does not yet support interrupts.
//
// If the yield function's [StreamFlowValue] argument has Done == true, the value's
// Output field contains the final output; the yield function will not be called
// again.
//
// Otherwise the Stream field of the passed [StreamFlowValue] holds a streamed result.
func StreamFlow[In, Out, Stream any](ctx context.Context, flow *core.Flow[In, Out, Stream], input In) func(func(*StreamFlowValue[Out, Stream], error) bool) {
return func(yield func(*StreamFlowValue[Out, Stream], error) bool) {
cb := func(ctx context.Context, s Stream) error {
if ctx.Err() != nil {
return ctx.Err()
}
if !yield(&StreamFlowValue[Out, Stream]{Stream: s}, nil) {
return errStop
}
return nil
}
output, err := core.InternalStreamFlow(ctx, flow, input, cb)
if err != nil {
yield(nil, err)
} else {
yield(&StreamFlowValue[Out, Stream]{Done: true, Output: output}, nil)
}
}
}

var errStop = errors.New("stop")

// StartFlowServer starts a server serving the routes described in [NewFlowServeMux].
// It listens on addr, or if empty, the value of the PORT environment variable,
// or if that is empty, ":3400".
Expand Down
6 changes: 4 additions & 2 deletions go/genkit/genkit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,15 @@ package genkit
import (
"context"
"testing"

"github.com/firebase/genkit/go/core"
)

func TestStreamFlow(t *testing.T) {
f := DefineStreamingFlow("count", count)
iter := StreamFlow(context.Background(), f, 2)
iter := f.Stream(context.Background(), 2)
want := 0
iter(func(val *StreamFlowValue[int, int], err error) bool {
iter(func(val *core.StreamFlowValue[int, int], err error) bool {
if err != nil {
t.Fatal(err)
}
Expand Down
8 changes: 4 additions & 4 deletions go/internal/doc-snippets/flows.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"net/http"
"strings"

"github.com/firebase/genkit/go/core"
"github.com/firebase/genkit/go/genkit"
)

Expand Down Expand Up @@ -59,7 +60,7 @@ func f2() {
)
// !-flow2
// !+run1
suggestion, err := genkit.RunFlow(context.Background(), menuSuggestionFlow, "French")
suggestion, err := menuSuggestionFlow.Run(context.Background(), "French")
// !-run1
_ = suggestion
_ = err
Expand Down Expand Up @@ -101,11 +102,10 @@ func f3() {
// !-streaming

// !+invoke-streaming
genkit.StreamFlow(
menuSuggestionFlow.Stream(
context.Background(),
menuSuggestionFlow,
"French",
)(func(sfv *genkit.StreamFlowValue[OutputType, StreamType], err error) bool {
)(func(sfv *core.StreamFlowValue[OutputType, StreamType], err error) bool {
if !sfv.Done {
fmt.Print(sfv.Output)
return true
Expand Down
4 changes: 2 additions & 2 deletions go/samples/coffee-shop/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,7 @@ func main() {
})

genkit.DefineFlow("testAllCoffeeFlows", func(ctx context.Context, _ struct{}) (*testAllCoffeeFlowsOutput, error) {
test1, err := genkit.RunFlow(ctx, simpleGreetingFlow, &simpleGreetingInput{
test1, err := simpleGreetingFlow.Run(ctx, &simpleGreetingInput{
CustomerName: "Sam",
})
if err != nil {
Expand All @@ -247,7 +247,7 @@ func main() {
}
return out, nil
}
test2, err := genkit.RunFlow(ctx, greetingWithHistoryFlow, &customerTimeAndHistoryInput{
test2, err := greetingWithHistoryFlow.Run(ctx, &customerTimeAndHistoryInput{
CustomerName: "Sam",
CurrentTime: "09:45am",
PreviousOrder: "Caramel Macchiato",
Expand Down
2 changes: 1 addition & 1 deletion go/samples/flow-sample1/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func main() {
})

genkit.DefineFlow("parent", func(ctx context.Context, _ struct{}) (string, error) {
return genkit.RunFlow(ctx, basic, "foo")
return basic.Run(ctx, "foo")
})

type complex struct {
Expand Down
4 changes: 2 additions & 2 deletions go/samples/menu/s05.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ func setup05(ctx context.Context, gen, genVision *ai.Model) error {

genkit.DefineFlow("s05_visionMenuQuestion",
func(ctx context.Context, input *menuQuestionInput) (*answerOutput, error) {
menuText, err := genkit.RunFlow(ctx, readMenuFlow, struct{}{})
menuText, err := readMenuFlow.Run(ctx, struct{}{})
if err != nil {
return nil, err
}
Expand All @@ -134,7 +134,7 @@ func setup05(ctx context.Context, gen, genVision *ai.Model) error {
MenuText: menuText,
Question: input.Question,
}
return genkit.RunFlow(ctx, textMenuQuestionFlow, questionInput)
return textMenuQuestionFlow.Run(ctx, questionInput)
},
)

Expand Down