Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
111 changes: 111 additions & 0 deletions docs-go/flows
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
# Flows

Flows are wrapped functions with some additional characteristics over direct
calls: they are strongly typed, streamable, locally and remotely callable, and
fully observable.
Firebase Genkit provides CLI and developer UI tooling for running and debugging flows.

## Defining flows

In its simplest form, a flow just wraps a function:

- {Go}

%include ../go/internal/doc-snippets/flows.go flow1

Doing so lets you run the function from the Genkit CLI and developer UI, and is
a requirement for many of Genkit's features, including deployment and
observability.

An important advantage Genkit flows have over directly calling a model API is
type safety of both inputs and outputs:

- {Go}

The argument and result types of a flow can be simple or structured values.
Genkit will produce JSON schemas for these values using
[`invopop/jsonschema`](https://pkg.go.dev/github.com/invopop/jsonschema).

The following flow takes a `string` as input and outputs a `struct`:

%include ../go/internal/doc-snippets/flows.go msug

%include ../go/internal/doc-snippets/flows.go flow2

## Running flows

To run a flow in your code:

- {Go}

%include ../go/internal/doc-snippets/flows.go run1

You can use the CLI to run flows as well:

```posix-terminal
genkit flow:run menuSuggestionFlow '"French"'
```

### Streamed

Here's a simple example of a flow that can stream values:

- {Go}

%include ../go/internal/doc-snippets/flows.go streaming-types

%include ../go/internal/doc-snippets/flows.go streaming

Note that the streaming callback can be undefined. It's only defined if the
invoking client is requesting streamed response.

To invoke a flow in streaming mode:

- {Go}

%include ../go/internal/doc-snippets/flows.go invoke-streaming

If the flow doesn't implement streaming, `StreamFlow()` behaves identically to
`RunFlow()`.

You can use the CLI to stream flows as well:

```posix-terminal
genkit flow:run menuSuggestionFlow '"French"' -s
```

## Deploying flows

If you want to be able to access your flow over HTTP you will need to deploy it
first.

- {Go}

To deploy flows using Cloud Run and similar services, define your flows, and
then call `StartFlowServer()`:

%include ../go/internal/doc-snippets/flows.go main

`StartFlowsServer` starts a `net/http` server that exposes your flows as HTTP
endpoints (for example, `http://localhost:3400/menuSuggestionFlow`). Both
parameters are optional:

- You can specify the address and port to listen on. If you don't,
the server listens on any address and the port specified by the PORT
environment variable; if that is empty, it uses the default of port 3400.
- You can specify which flows to serve. If you don't, `StartFlowsServer`
serves all of your defined flows.

If you want to serve flows on the same host and port as other endpoints, you
can call `NewFlowServeMux()` to get a handler for your Genkit flows, which you
can multiplex with your other route handlers:

%include ../go/internal/doc-snippets/flows.go mux

## Flow observability

Sometimes when using 3rd party SDKs that are not instrumented for observability,
you might want to see them as a separate trace step in the Developer UI. All you
need to do is wrap the code in the `run` function.

%include ../go/internal/doc-snippets/flows.go run
143 changes: 74 additions & 69 deletions docs-go/flows.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
<!-- Autogenerated by weave; DO NOT EDIT -->

# Flows

Flows are wrapped functions with some additional characteristics over direct
Expand All @@ -13,12 +15,11 @@ In its simplest form, a flow just wraps a function:

```go
menuSuggestionFlow := genkit.DefineFlow(
"menuSuggestionFlow",
func(ctx context.Context, restaurantTheme string) (string, error) {
suggestion := makeMenuItemSuggestion(restaurantTheme)
return suggestion, nil
},
)
"menuSuggestionFlow",
func(ctx context.Context, restaurantTheme string) (string, error) {
suggestion := makeMenuItemSuggestion(restaurantTheme)
return suggestion, nil
})
```

Doing so lets you run the function from the Genkit CLI and developer UI, and is
Expand All @@ -38,19 +39,19 @@ type safety of both inputs and outputs:

```go
type MenuSuggestion struct {
ItemName string `json:"item_name"`
Description string `json:"description"`
Calories int `json:"calories"`
ItemName string `json:"item_name"`
Description string `json:"description"`
Calories int `json:"calories"`
}
```

```go
menuSuggestionFlow := genkit.DefineFlow(
"menuSuggestionFlow",
func(ctx context.Context, restaurantTheme string) (MenuSuggestion, error) {
suggestion := makeStructuredMenuItemSuggestion(restaurantTheme)
return suggestion, nil
},
"menuSuggestionFlow",
func(ctx context.Context, restaurantTheme string) (MenuSuggestion, error) {
suggestion := makeStructuredMenuItemSuggestion(restaurantTheme)
return suggestion, nil
},
)
```

Expand Down Expand Up @@ -78,32 +79,34 @@ Here's a simple example of a flow that can stream values:

```go
// Types for illustrative purposes.
type inputType string
type outputType string
type streamType string
type InputType string
type OutputType string
type StreamType string
```

menuSuggestionFlow := genkit.DefineFlow(
"menuSuggestionFlow",
func(
ctx context.Context,
restaurantTheme inputType,
callback func(context.Context, streamType) error,
) (outputType, error) {
var menu strings.Builder
menuChunks := make(chan streamType)
go makeFullMenuSuggestion(restaurantTheme, menuChunks)
for {
chunk, ok := <-menuChunks
if !ok {
break
}
if callback != nil {
callback(context.Background(), chunk)
}
menu.WriteString(string(chunk))
}
return outputType(menu.String()), nil
},
```go
menuSuggestionFlow := genkit.DefineStreamingFlow(
"menuSuggestionFlow",
func(
ctx context.Context,
restaurantTheme InputType,
callback func(context.Context, StreamType) error,
) (OutputType, error) {
var menu strings.Builder
menuChunks := make(chan StreamType)
go makeFullMenuSuggestion(restaurantTheme, menuChunks)
for {
chunk, ok := <-menuChunks
if !ok {
break
}
if callback != nil {
callback(context.Background(), chunk)
}
menu.WriteString(string(chunk))
}
return OutputType(menu.String()), nil
},
)
```

Expand All @@ -116,16 +119,16 @@ To invoke a flow in streaming mode:

```go
genkit.StreamFlow(
context.Background(),
menuSuggestionFlow,
"French",
)(func(sfv *genkit.StreamFlowValue[outputType, streamType], err error) bool {
if !sfv.Done {
fmt.Print(sfv.Output)
return true
} else {
return false
}
context.Background(),
menuSuggestionFlow,
"French",
)(func(sfv *genkit.StreamFlowValue[OutputType, StreamType], err error) bool {
if !sfv.Done {
fmt.Print(sfv.Output)
return true
} else {
return false
}
})
```

Expand All @@ -150,16 +153,17 @@ first.

```go
func main() {
genkit.DefineFlow(
"menuSuggestionFlow",
func(ctx context.Context, restaurantTheme string) (string, error) {
// ...
},
)
err := genkit.StartFlowServer(":1234", []string{})

// startProdServer always returns a non-nil error: the one returned by
// http.ListenAndServe.
genkit.DefineFlow(
"menuSuggestionFlow",
func(ctx context.Context, restaurantTheme string) (string, error) {
// ...
return "", nil
},
)
// StartFlowServer always returns a non-nil error: the one returned by
// http.ListenAndServe.
err := genkit.StartFlowServer(":1234", []string{})
log.Fatal(err)
}
```

Expand All @@ -179,7 +183,7 @@ first.

```go
mainMux := http.NewServeMux()
mainMux.Handle("POST /flow/", http.StripPrefix("/flow/", genkit.NewFlowServeMux()))
mainMux.Handle("POST /flow/", http.StripPrefix("/flow/", genkit.NewFlowServeMux(nil)))
```

## Flow observability
Expand All @@ -190,13 +194,14 @@ need to do is wrap the code in the `run` function.

```go
genkit.DefineFlow(
"menuSuggestionFlow",
func(ctx context.Context, restaurantTheme string) (string, error) {
themes, err := genkit.Run(ctx, "find-similar-themes", func() (string, error) {
// ...
})

// ...
},
)
"menuSuggestionFlow",
func(ctx context.Context, restaurantTheme string) (string, error) {
themes, err := genkit.Run(ctx, "find-similar-themes", func() (string, error) {
// ...
return "", nil
})

// ...
return themes, err
})
```
11 changes: 11 additions & 0 deletions docs-go/generate.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
#!/bin/sh -e

weave=$HOME/go/bin/weave
if [[ ! -f $weave ]]; then
echo "installing weave"
go -C ../go install ./internal/cmd/weave
fi

$weave flows > flows.md


Loading