diff --git a/docs-go/flows b/docs-go/flows new file mode 100644 index 0000000000..e68de5ac5f --- /dev/null +++ b/docs-go/flows @@ -0,0 +1,111 @@ +# Flows + +Flows are wrapped functions with some additional characteristics over direct +calls: they are strongly typed, streamable, locally and remotely callable, and +fully observable. +Firebase Genkit provides CLI and developer UI tooling for running and debugging flows. + +## Defining flows + +In its simplest form, a flow just wraps a function: + +- {Go} + + %include ../go/internal/doc-snippets/flows.go flow1 + +Doing so lets you run the function from the Genkit CLI and developer UI, and is +a requirement for many of Genkit's features, including deployment and +observability. + +An important advantage Genkit flows have over directly calling a model API is +type safety of both inputs and outputs: + +- {Go} + + The argument and result types of a flow can be simple or structured values. + Genkit will produce JSON schemas for these values using + [`invopop/jsonschema`](https://pkg.go.dev/github.com/invopop/jsonschema). + + The following flow takes a `string` as input and outputs a `struct`: + + %include ../go/internal/doc-snippets/flows.go msug + + %include ../go/internal/doc-snippets/flows.go flow2 + +## Running flows + +To run a flow in your code: + +- {Go} + + %include ../go/internal/doc-snippets/flows.go run1 + +You can use the CLI to run flows as well: + +```posix-terminal +genkit flow:run menuSuggestionFlow '"French"' +``` + +### Streamed + +Here's a simple example of a flow that can stream values: + +- {Go} + + %include ../go/internal/doc-snippets/flows.go streaming-types + + %include ../go/internal/doc-snippets/flows.go streaming + +Note that the streaming callback can be undefined. It's only defined if the +invoking client is requesting streamed response. + +To invoke a flow in streaming mode: + +- {Go} + + %include ../go/internal/doc-snippets/flows.go invoke-streaming + + If the flow doesn't implement streaming, `StreamFlow()` behaves identically to + `RunFlow()`. + +You can use the CLI to stream flows as well: + +```posix-terminal +genkit flow:run menuSuggestionFlow '"French"' -s +``` + +## Deploying flows + +If you want to be able to access your flow over HTTP you will need to deploy it +first. + +- {Go} + + To deploy flows using Cloud Run and similar services, define your flows, and + then call `StartFlowServer()`: + + %include ../go/internal/doc-snippets/flows.go main + + `StartFlowsServer` starts a `net/http` server that exposes your flows as HTTP + endpoints (for example, `http://localhost:3400/menuSuggestionFlow`). Both + parameters are optional: + + - You can specify the address and port to listen on. If you don't, + the server listens on any address and the port specified by the PORT + environment variable; if that is empty, it uses the default of port 3400. + - You can specify which flows to serve. If you don't, `StartFlowsServer` + serves all of your defined flows. + + If you want to serve flows on the same host and port as other endpoints, you + can call `NewFlowServeMux()` to get a handler for your Genkit flows, which you + can multiplex with your other route handlers: + + %include ../go/internal/doc-snippets/flows.go mux + +## Flow observability + +Sometimes when using 3rd party SDKs that are not instrumented for observability, +you might want to see them as a separate trace step in the Developer UI. All you +need to do is wrap the code in the `run` function. + +%include ../go/internal/doc-snippets/flows.go run diff --git a/docs-go/flows.md b/docs-go/flows.md index 95a93fe334..8f97f1ff5a 100644 --- a/docs-go/flows.md +++ b/docs-go/flows.md @@ -1,3 +1,5 @@ + + # Flows Flows are wrapped functions with some additional characteristics over direct @@ -13,12 +15,11 @@ In its simplest form, a flow just wraps a function: ```go menuSuggestionFlow := genkit.DefineFlow( - "menuSuggestionFlow", - func(ctx context.Context, restaurantTheme string) (string, error) { - suggestion := makeMenuItemSuggestion(restaurantTheme) - return suggestion, nil - }, - ) + "menuSuggestionFlow", + func(ctx context.Context, restaurantTheme string) (string, error) { + suggestion := makeMenuItemSuggestion(restaurantTheme) + return suggestion, nil + }) ``` Doing so lets you run the function from the Genkit CLI and developer UI, and is @@ -38,19 +39,19 @@ type safety of both inputs and outputs: ```go type MenuSuggestion struct { - ItemName string `json:"item_name"` - Description string `json:"description"` - Calories int `json:"calories"` + ItemName string `json:"item_name"` + Description string `json:"description"` + Calories int `json:"calories"` } ``` ```go menuSuggestionFlow := genkit.DefineFlow( - "menuSuggestionFlow", - func(ctx context.Context, restaurantTheme string) (MenuSuggestion, error) { - suggestion := makeStructuredMenuItemSuggestion(restaurantTheme) - return suggestion, nil - }, + "menuSuggestionFlow", + func(ctx context.Context, restaurantTheme string) (MenuSuggestion, error) { + suggestion := makeStructuredMenuItemSuggestion(restaurantTheme) + return suggestion, nil + }, ) ``` @@ -78,32 +79,34 @@ Here's a simple example of a flow that can stream values: ```go // Types for illustrative purposes. - type inputType string - type outputType string - type streamType string + type InputType string + type OutputType string + type StreamType string + ``` - menuSuggestionFlow := genkit.DefineFlow( - "menuSuggestionFlow", - func( - ctx context.Context, - restaurantTheme inputType, - callback func(context.Context, streamType) error, - ) (outputType, error) { - var menu strings.Builder - menuChunks := make(chan streamType) - go makeFullMenuSuggestion(restaurantTheme, menuChunks) - for { - chunk, ok := <-menuChunks - if !ok { - break - } - if callback != nil { - callback(context.Background(), chunk) - } - menu.WriteString(string(chunk)) - } - return outputType(menu.String()), nil - }, + ```go + menuSuggestionFlow := genkit.DefineStreamingFlow( + "menuSuggestionFlow", + func( + ctx context.Context, + restaurantTheme InputType, + callback func(context.Context, StreamType) error, + ) (OutputType, error) { + var menu strings.Builder + menuChunks := make(chan StreamType) + go makeFullMenuSuggestion(restaurantTheme, menuChunks) + for { + chunk, ok := <-menuChunks + if !ok { + break + } + if callback != nil { + callback(context.Background(), chunk) + } + menu.WriteString(string(chunk)) + } + return OutputType(menu.String()), nil + }, ) ``` @@ -116,16 +119,16 @@ To invoke a flow in streaming mode: ```go genkit.StreamFlow( - context.Background(), - menuSuggestionFlow, - "French", - )(func(sfv *genkit.StreamFlowValue[outputType, streamType], err error) bool { - if !sfv.Done { - fmt.Print(sfv.Output) - return true - } else { - return false - } + context.Background(), + menuSuggestionFlow, + "French", + )(func(sfv *genkit.StreamFlowValue[OutputType, StreamType], err error) bool { + if !sfv.Done { + fmt.Print(sfv.Output) + return true + } else { + return false + } }) ``` @@ -150,16 +153,17 @@ first. ```go func main() { - genkit.DefineFlow( - "menuSuggestionFlow", - func(ctx context.Context, restaurantTheme string) (string, error) { - // ... - }, - ) - err := genkit.StartFlowServer(":1234", []string{}) - - // startProdServer always returns a non-nil error: the one returned by - // http.ListenAndServe. + genkit.DefineFlow( + "menuSuggestionFlow", + func(ctx context.Context, restaurantTheme string) (string, error) { + // ... + return "", nil + }, + ) + // StartFlowServer always returns a non-nil error: the one returned by + // http.ListenAndServe. + err := genkit.StartFlowServer(":1234", []string{}) + log.Fatal(err) } ``` @@ -179,7 +183,7 @@ first. ```go mainMux := http.NewServeMux() - mainMux.Handle("POST /flow/", http.StripPrefix("/flow/", genkit.NewFlowServeMux())) + mainMux.Handle("POST /flow/", http.StripPrefix("/flow/", genkit.NewFlowServeMux(nil))) ``` ## Flow observability @@ -190,13 +194,14 @@ need to do is wrap the code in the `run` function. ```go genkit.DefineFlow( - "menuSuggestionFlow", - func(ctx context.Context, restaurantTheme string) (string, error) { - themes, err := genkit.Run(ctx, "find-similar-themes", func() (string, error) { - // ... - }) - - // ... - }, -) + "menuSuggestionFlow", + func(ctx context.Context, restaurantTheme string) (string, error) { + themes, err := genkit.Run(ctx, "find-similar-themes", func() (string, error) { + // ... + return "", nil + }) + + // ... + return themes, err + }) ``` diff --git a/docs-go/generate.sh b/docs-go/generate.sh new file mode 100755 index 0000000000..44e011c5b2 --- /dev/null +++ b/docs-go/generate.sh @@ -0,0 +1,11 @@ +#!/bin/sh -e + +weave=$HOME/go/bin/weave +if [[ ! -f $weave ]]; then + echo "installing weave" + go -C ../go install ./internal/cmd/weave +fi + +$weave flows > flows.md + + diff --git a/go/internal/cmd/weave/weave.go b/go/internal/cmd/weave/weave.go new file mode 100644 index 0000000000..14e6450d88 --- /dev/null +++ b/go/internal/cmd/weave/weave.go @@ -0,0 +1,228 @@ +// Copyright 2024 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// The weave command is a simple preprocessor for markdown files. +// It builds a table of contents and processes %include directives. +// +// Example usage: +// +// $ go run internal/cmd/weave go-types.md > README.md +// +// The weave command copies lines of the input file to standard output, with two +// exceptions: +// +// If a line begins with "%toc", it is replaced with a table of contents +// consisting of links to the top two levels of headers ("#" and "##"). +// +// If a line begins with "%include FILENAME TAG", it is replaced with the lines +// of the file between lines containing "!+TAG" and "!-TAG". TAG can be omitted, +// in which case the delimiters are simply "!+" and "!-". +package main + +// Modified from golang.org/x/example/internal/cmd/weave/weave.go. + +import ( + "bufio" + "bytes" + "fmt" + "log" + "os" + "regexp" + "strings" +) + +func main() { + log.SetFlags(0) + log.SetPrefix("weave: ") + if len(os.Args) != 2 { + log.Fatal("usage: weave input.md\n") + } + + f, err := os.Open(os.Args[1]) + if err != nil { + log.Fatal(err) + } + defer f.Close() + + fmt.Println("") + fmt.Println() + + // Pass 1: extract table of contents. + var toc []string + in := bufio.NewScanner(f) + for in.Scan() { + line := in.Text() + if line == "" || (line[0] != '#' && line[0] != '%') { + continue + } + line = strings.TrimSpace(line) + if line == "%toc" { + toc = nil + } else if strings.HasPrefix(line, "# ") || strings.HasPrefix(line, "## ") { + words := strings.Fields(line) + depth := len(words[0]) + words = words[1:] + text := strings.Join(words, " ") + for i := range words { + words[i] = strings.ToLower(words[i]) + } + line = fmt.Sprintf("%s1. [%s](#%s)", + strings.Repeat("\t", depth-1), text, strings.Join(words, "-")) + toc = append(toc, line) + } + } + if in.Err() != nil { + log.Fatal(in.Err()) + } + + // Pass 2. + if _, err := f.Seek(0, os.SEEK_SET); err != nil { + log.Fatalf("can't rewind input: %v", err) + } + in = bufio.NewScanner(f) + for in.Scan() { + line := in.Text() + tline := strings.TrimSpace(line) + switch { + case strings.HasPrefix(tline, "%toc"): // ToC + for _, h := range toc { + fmt.Println(h) + } + case strings.HasPrefix(tline, "%include"): + // Indent the output by the whitespace preceding "%include". + indent := line[:strings.IndexByte(line, '%')] + words := strings.Fields(line) + if len(words) < 2 { + log.Fatal(line) + } + filename := words[1] + + section := "" + if len(words) > 2 { + section = words[2] + } + s, err := include(filename, section) + if err != nil { + log.Fatal(err) + } + fmt.Printf("%s```go\n", indent) + fmt.Println(cleanListing(s, indent)) // TODO(adonovan): escape /^```/ in s + fmt.Printf("%s```\n", indent) + default: + fmt.Println(line) + } + } + if in.Err() != nil { + log.Fatal(in.Err()) + } +} + +// include processes an included file, and returns the included text. +// Only lines between those matching !+tag and !-tag will be returned. +// This is true even if tag=="". +func include(file, tag string) (string, error) { + f, err := os.Open(file) + if err != nil { + return "", err + } + defer f.Close() + + startre, err := regexp.Compile("!\\+" + tag + "$") + if err != nil { + return "", err + } + endre, err := regexp.Compile("!\\-" + tag + "$") + if err != nil { + return "", err + } + + var text bytes.Buffer + in := bufio.NewScanner(f) + var on bool + for in.Scan() { + line := in.Text() + switch { + case startre.MatchString(line): + on = true + case endre.MatchString(line): + on = false + case on: + text.WriteByte('\t') + text.WriteString(line) + text.WriteByte('\n') + } + } + if in.Err() != nil { + return "", in.Err() + } + if text.Len() == 0 { + return "", fmt.Errorf("no lines of %s matched tag %q", file, tag) + } + return text.String(), nil +} + +func isBlank(line string) bool { return strings.TrimSpace(line) == "" } + +func indented(line string) bool { + return strings.HasPrefix(line, " ") || strings.HasPrefix(line, "\t") +} + +// cleanListing removes entirely blank leading and trailing lines from +// text, and removes n leading tabs. +// It then prefixes each line with indent. +func cleanListing(text, indent string) string { + lines := strings.Split(text, "\n") + + // remove minimum number of leading tabs from all non-blank lines + tabs := 999 + for i, line := range lines { + if strings.TrimSpace(line) == "" { + lines[i] = "" + } else { + if n := leadingTabs(line); n < tabs { + tabs = n + } + } + } + for i, line := range lines { + if line != "" { + line := line[tabs:] + lines[i] = line // remove leading tabs + } + } + + // remove leading blank lines + for len(lines) > 0 && lines[0] == "" { + lines = lines[1:] + } + // remove trailing blank lines + for len(lines) > 0 && lines[len(lines)-1] == "" { + lines = lines[:len(lines)-1] + } + // add indent + for i, ln := range lines { + lines[i] = indent + ln + } + return strings.Join(lines, "\n") +} + +func leadingTabs(s string) int { + var i int + for i = 0; i < len(s); i++ { + if s[i] != '\t' { + break + } + } + return i +} diff --git a/go/internal/doc-snippets/flows.go b/go/internal/doc-snippets/flows.go new file mode 100644 index 0000000000..134ed70f42 --- /dev/null +++ b/go/internal/doc-snippets/flows.go @@ -0,0 +1,161 @@ +// Copyright 2024 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package snippets + +import ( + "context" + "fmt" + "log" + "net/http" + "strings" + + "github.com/firebase/genkit/go/genkit" +) + +func f1() { + // !+flow1 + menuSuggestionFlow := genkit.DefineFlow( + "menuSuggestionFlow", + func(ctx context.Context, restaurantTheme string) (string, error) { + suggestion := makeMenuItemSuggestion(restaurantTheme) + return suggestion, nil + }) + // !-flow1 + _ = menuSuggestionFlow + +} + +// !+msug +type MenuSuggestion struct { + ItemName string `json:"item_name"` + Description string `json:"description"` + Calories int `json:"calories"` +} + +// !-msug + +func makeMenuItemSuggestion(string) string { return "" } + +func f2() { + // !+flow2 + menuSuggestionFlow := genkit.DefineFlow( + "menuSuggestionFlow", + func(ctx context.Context, restaurantTheme string) (MenuSuggestion, error) { + suggestion := makeStructuredMenuItemSuggestion(restaurantTheme) + return suggestion, nil + }, + ) + // !-flow2 + // !+run1 + suggestion, err := genkit.RunFlow(context.Background(), menuSuggestionFlow, "French") + // !-run1 + _ = suggestion + _ = err +} + +// !+streaming-types +// Types for illustrative purposes. +type InputType string +type OutputType string +type StreamType string + +//!-streaming-types + +func f3() { + // !+streaming + menuSuggestionFlow := genkit.DefineStreamingFlow( + "menuSuggestionFlow", + func( + ctx context.Context, + restaurantTheme InputType, + callback func(context.Context, StreamType) error, + ) (OutputType, error) { + var menu strings.Builder + menuChunks := make(chan StreamType) + go makeFullMenuSuggestion(restaurantTheme, menuChunks) + for { + chunk, ok := <-menuChunks + if !ok { + break + } + if callback != nil { + callback(context.Background(), chunk) + } + menu.WriteString(string(chunk)) + } + return OutputType(menu.String()), nil + }, + ) + // !-streaming + + // !+invoke-streaming + genkit.StreamFlow( + context.Background(), + menuSuggestionFlow, + "French", + )(func(sfv *genkit.StreamFlowValue[OutputType, StreamType], err error) bool { + if !sfv.Done { + fmt.Print(sfv.Output) + return true + } else { + return false + } + }) + // !-invoke-streaming +} + +func makeStructuredMenuItemSuggestion(string) MenuSuggestion { return MenuSuggestion{} } + +func makeFullMenuSuggestion(restaurantTheme InputType, menuChunks chan StreamType) { + +} + +// !+main +func main() { + genkit.DefineFlow( + "menuSuggestionFlow", + func(ctx context.Context, restaurantTheme string) (string, error) { + // ... + return "", nil + }, + ) + // StartFlowServer always returns a non-nil error: the one returned by + // http.ListenAndServe. + err := genkit.StartFlowServer(":1234", []string{}) + log.Fatal(err) +} + +// !-main + +func f4() { + // !+mux + mainMux := http.NewServeMux() + mainMux.Handle("POST /flow/", http.StripPrefix("/flow/", genkit.NewFlowServeMux(nil))) + // !-mux + // !+run + genkit.DefineFlow( + "menuSuggestionFlow", + func(ctx context.Context, restaurantTheme string) (string, error) { + themes, err := genkit.Run(ctx, "find-similar-themes", func() (string, error) { + // ... + return "", nil + }) + + // ... + return themes, err + }) + // !-run + +} diff --git a/go/internal/doc-snippets/models.go b/go/internal/doc-snippets/models.go new file mode 100644 index 0000000000..6fdd3f3b58 --- /dev/null +++ b/go/internal/doc-snippets/models.go @@ -0,0 +1,15 @@ +// Copyright 2024 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package snippets