Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions go/core/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ func init() {
type registry struct {
tstate *tracing.State
mu sync.Mutex
frozen bool // when true, no more additions
actions map[string]action
flows []flow
// TraceStores, at most one for each [Environment].
Expand Down Expand Up @@ -98,6 +99,9 @@ func (r *registry) registerAction(a action) {
key := fmt.Sprintf("/%s/%s", a.actionType(), a.Name())
r.mu.Lock()
defer r.mu.Unlock()
if r.frozen {
panic(fmt.Sprintf("attempt to register action %s in a frozen registry. Register before calling genkit.Init", key))
}
if _, ok := r.actions[key]; ok {
panic(fmt.Sprintf("action %q is already registered", key))
}
Expand All @@ -108,6 +112,12 @@ func (r *registry) registerAction(a action) {
"name", a.Name())
}

func (r *registry) freeze() {
r.mu.Lock()
defer r.mu.Unlock()
r.frozen = true
}

// lookupAction returns the action for the given key, or nil if there is none.
func (r *registry) lookupAction(key string) action {
r.mu.Lock()
Expand Down
67 changes: 52 additions & 15 deletions go/core/servers.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,23 +51,51 @@ import (
// a dev server.
//
// StartFlowServer always returns a non-nil error, the one returned by http.ListenAndServe.
func StartFlowServer(addr string) error {
func StartFlowServer(addr string, flows []string) error {
return startProdServer(addr, flows)
}

// InternalInit is for use by the genkit package only.
// It is not subject to compatibility guarantees.
func InternalInit(opts *Options) error {
if opts == nil {
opts = &Options{}
}
globalRegistry.freeze()

if currentEnvironment() == EnvironmentDev {
go func() {
err := startDevServer("")
err := startDevServer()
slog.Error("dev server stopped", "err", err)
}()
}
return startProdServer(addr)
if opts.FlowAddr == "-" {
return nil
}
return StartFlowServer(opts.FlowAddr, opts.Flows)
}

// startDevServer starts the development server (reflection API) listening at the given address.
// If addr is "", it uses the value of the environment variable GENKIT_REFLECTION_PORT
// for the port, and if that is empty it uses ":3100".
// Options are options to [InternalInit].
type Options struct {
// If "-", do not start a FlowServer.
// Otherwise, start a FlowServer on the given address, or the
// default if empty.
FlowAddr string
// The names of flows to serve.
// If empty, all registered flows are served.
Flows []string
}

// startDevServer starts the development server (reflection API) listening at the
// value of the environment variable GENKIT_REFLECTION_PORT for the port, or ":3100"
// if it is empty.
// startDevServer always returns a non-nil error, the one returned by http.ListenAndServe.
func startDevServer(addr string) error {
func startDevServer() error {
slog.Info("starting dev server")
addr = serverAddress(addr, "GENKIT_REFLECTION_PORT", "127.0.0.1:3100")
// Don't use "localhost" here. That only binds the IPv4 address, and the genkit tool
// wants to connect to the IPv6 address even when you tell it to use "localhost".
// Omitting the host works.
addr := serverAddress("", "GENKIT_REFLECTION_PORT", "127.0.0.1:3100")
mux := newDevServeMux(globalRegistry)
return listenAndServe(addr, mux)
}
Expand Down Expand Up @@ -243,14 +271,17 @@ type listFlowStatesResult struct {
// startProdServer always returns a non-nil error, the one returned by http.ListenAndServe.
//
// To construct a server with additional routes, use [NewFlowServeMux].
func startProdServer(addr string) error {
func startProdServer(addr string, flows []string) error {
slog.Info("starting flow server")
addr = serverAddress(addr, "PORT", "127.0.0.1:3400")
mux := NewFlowServeMux()
mux := NewFlowServeMux(flows)
return listenAndServe(addr, mux)
}

// NewFlowServeMux constructs a [net/http.ServeMux] where each defined flow is a route.
// NewFlowServeMux constructs a [net/http.ServeMux].
// If flows is non-empty, the each of the named flows is registered as a route.
// Otherwise, all defined flows are registered.
//
// All routes take a single query parameter, "stream", which if true will stream the
// flow's results back to the client. (Not all flows support streaming, however.)
//
Expand All @@ -259,14 +290,20 @@ func startProdServer(addr string) error {
//
// mainMux := http.NewServeMux()
// mainMux.Handle("POST /flow/", http.StripPrefix("/flow/", NewFlowServeMux()))
func NewFlowServeMux() *http.ServeMux {
return newFlowServeMux(globalRegistry)
func NewFlowServeMux(flows []string) *http.ServeMux {
return newFlowServeMux(globalRegistry, flows)
}

func newFlowServeMux(r *registry) *http.ServeMux {
func newFlowServeMux(r *registry, flows []string) *http.ServeMux {
mux := http.NewServeMux()
m := map[string]bool{}
for _, f := range flows {
m[f] = true
}
for _, f := range r.listFlows() {
handle(mux, "POST /"+f.Name(), nonDurableFlowHandler(f))
if len(flows) == 0 || m[f.Name()] {
handle(mux, "POST /"+f.Name(), nonDurableFlowHandler(f))
}
}
return mux
}
Expand Down
2 changes: 1 addition & 1 deletion go/core/servers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ func TestProdServer(t *testing.T) {
defineFlow(r, "inc", func(_ context.Context, i int, _ NoStream) (int, error) {
return i + 1, nil
})
srv := httptest.NewServer(newFlowServeMux(r))
srv := httptest.NewServer(newFlowServeMux(r, nil))
defer srv.Close()

check := func(t *testing.T, input string, wantStatus, wantResult int) {
Expand Down
43 changes: 38 additions & 5 deletions go/genkit/genkit.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,37 @@ import (
"github.com/firebase/genkit/go/core"
)

// Options are options to [Init].
type Options struct {
// If "-", do not start a FlowServer.
// Otherwise, start a FlowServer on the given address, or the
// default of ":3400" if empty.
FlowAddr string
// The names of flows to serve.
// If empty, all registered flows are served.
Flows []string
}

// Init initializes Genkit.
// After it is called, no further actions can be defined.
//
// Init starts servers depending on the value of the GENKIT_ENV
// environment variable and the provided options.
//
// If GENKIT_ENV = "dev", a development server is started
// in a separate goroutine at the address in opts.DevAddr, or the default
// of ":3100" if empty.
//
// If opts.FlowAddr is a value other than "-", a flow server is started (see [StartFlowServer])
// and the call to Init waits for the server to shut down.
// If opts.FlowAddr == "-", no flow server is started and Init returns immediately.
//
// Thus Init(nil) will start a dev server in the "dev" environment, will always start
// a flow server, and will pause execution until the flow server terminates.
func Init(opts *Options) error {
return core.InternalInit((*core.Options)(opts))
}

// DefineFlow creates a Flow that runs fn, and registers it as an action.
//
// fn takes an input of type In and returns an output of type Out.
Expand Down Expand Up @@ -120,11 +151,13 @@ var errStop = errors.New("stop")
// a dev server.
//
// StartFlowServer always returns a non-nil error, the one returned by http.ListenAndServe.
func StartFlowServer(addr string) error {
return core.StartFlowServer(addr)
func StartFlowServer(addr string, flows []string) error {
return core.StartFlowServer(addr, flows)
}

// NewFlowServeMux constructs a [net/http.ServeMux] where each defined flow is a route.
// NewFlowServeMux constructs a [net/http.ServeMux].
// If flows is non-empty, the each of the named flows is registered as a route.
// Otherwise, all defined flows are registered.
// All routes take a single query parameter, "stream", which if true will stream the
// flow's results back to the client. (Not all flows support streaming, however.)
//
Expand All @@ -133,6 +166,6 @@ func StartFlowServer(addr string) error {
//
// mainMux := http.NewServeMux()
// mainMux.Handle("POST /flow/", http.StripPrefix("/flow/", NewFlowServeMux()))
func NewFlowServeMux() *http.ServeMux {
return core.NewFlowServeMux()
func NewFlowServeMux(flows []string) *http.ServeMux {
return core.NewFlowServeMux(flows)
}
Loading