From dae89a7857ca1a367de4322156f063cedf7c8bfd Mon Sep 17 00:00:00 2001 From: Jonathan Amsterdam Date: Wed, 3 Jul 2024 10:30:17 -0400 Subject: [PATCH] [Go] remove internal/common Get rid of the internal/common package by moving things elsewhere. --- go/core/action.go | 6 +-- go/core/action_test.go | 3 +- go/core/core.go | 3 +- go/core/file_flow_state_store.go | 3 +- go/core/flow_state_store.go | 8 ++-- go/genkit/conformance_test.go | 3 +- go/genkit/flow.go | 22 +++++----- go/genkit/genkit.go | 3 +- go/genkit/servers.go | 20 ++++----- go/genkit/servers_test.go | 8 ++-- .../{common/common.go => action/action.go} | 36 ++-------------- go/internal/base/misc.go | 9 ++++ go/internal/registry/registry.go | 42 +++++++++++++------ 13 files changed, 80 insertions(+), 86 deletions(-) rename go/internal/{common/common.go => action/action.go} (64%) diff --git a/go/core/action.go b/go/core/action.go index 898ee81548..bdc54dbc2e 100644 --- a/go/core/action.go +++ b/go/core/action.go @@ -22,9 +22,9 @@ import ( "github.com/firebase/genkit/go/core/logger" "github.com/firebase/genkit/go/core/tracing" + "github.com/firebase/genkit/go/internal/action" "github.com/firebase/genkit/go/internal/atype" "github.com/firebase/genkit/go/internal/base" - "github.com/firebase/genkit/go/internal/common" "github.com/firebase/genkit/go/internal/metrics" "github.com/firebase/genkit/go/internal/registry" "github.com/invopop/jsonschema" @@ -235,8 +235,8 @@ func (a *Action[In, Out, Stream]) RunJSON(ctx context.Context, input json.RawMes } // Desc returns a description of the action. -func (a *Action[I, O, S]) Desc() common.ActionDesc { - ad := common.ActionDesc{ +func (a *Action[I, O, S]) Desc() action.Desc { + ad := action.Desc{ Name: a.name, Description: a.description, Metadata: a.metadata, diff --git a/go/core/action_test.go b/go/core/action_test.go index b52c6a2dfa..dbd2328452 100644 --- a/go/core/action_test.go +++ b/go/core/action_test.go @@ -21,7 +21,6 @@ import ( "testing" "github.com/firebase/genkit/go/internal/atype" - "github.com/firebase/genkit/go/internal/common" "github.com/firebase/genkit/go/internal/registry" ) @@ -105,7 +104,7 @@ func TestActionTracing(t *testing.T) { t.Fatal(err) } // The dev TraceStore is registered by Init, called from TestMain. - ts := registry.Global.LookupTraceStore(common.EnvironmentDev) + ts := registry.Global.LookupTraceStore(registry.EnvironmentDev) tds, _, err := ts.List(ctx, nil) if err != nil { t.Fatal(err) diff --git a/go/core/core.go b/go/core/core.go index 9f398e3624..dd1b8f79ea 100644 --- a/go/core/core.go +++ b/go/core/core.go @@ -28,7 +28,6 @@ import ( "context" "github.com/firebase/genkit/go/core/tracing" - "github.com/firebase/genkit/go/internal/common" "github.com/firebase/genkit/go/internal/registry" sdktrace "go.opentelemetry.io/otel/sdk/trace" ) @@ -39,7 +38,7 @@ import ( // all pending data is stored. // RegisterTraceStore panics if called more than once. func RegisterTraceStore(ts tracing.Store) (shutdown func(context.Context) error) { - registry.Global.RegisterTraceStore(common.EnvironmentProd, ts) + registry.Global.RegisterTraceStore(registry.EnvironmentProd, ts) return registry.Global.TracingState().AddTraceStoreBatch(ts) } diff --git a/go/core/file_flow_state_store.go b/go/core/file_flow_state_store.go index 302b33a536..d01993cd34 100644 --- a/go/core/file_flow_state_store.go +++ b/go/core/file_flow_state_store.go @@ -20,7 +20,6 @@ import ( "path/filepath" "github.com/firebase/genkit/go/internal/base" - "github.com/firebase/genkit/go/internal/common" ) // A FileFlowStateStore is a FlowStateStore that writes flowStates to files. @@ -37,7 +36,7 @@ func NewFileFlowStateStore(dir string) (*FileFlowStateStore, error) { return &FileFlowStateStore{dir: dir}, nil } -func (s *FileFlowStateStore) Save(ctx context.Context, id string, fs common.FlowStater) error { +func (s *FileFlowStateStore) Save(ctx context.Context, id string, fs base.FlowStater) error { data, err := fs.ToJSON() if err != nil { return err diff --git a/go/core/flow_state_store.go b/go/core/flow_state_store.go index 27617e2a47..b5a2abc4cc 100644 --- a/go/core/flow_state_store.go +++ b/go/core/flow_state_store.go @@ -17,7 +17,7 @@ package core import ( "context" - "github.com/firebase/genkit/go/internal/common" + "github.com/firebase/genkit/go/internal/base" ) // A FlowStateStore stores flow states. @@ -25,7 +25,7 @@ import ( // A durable FlowStateStore is necessary for durable flows. type FlowStateStore interface { // Save saves the FlowState to the store, overwriting an existing one. - Save(ctx context.Context, id string, fs common.FlowStater) error + Save(ctx context.Context, id string, fs base.FlowStater) error // Load reads the FlowState with the given ID from the store. // It returns an error that is fs.ErrNotExist if there isn't one. // pfs must be a pointer to a flowState[I, O] of the correct type. @@ -35,5 +35,5 @@ type FlowStateStore interface { // nopFlowStateStore is a FlowStateStore that does nothing. type nopFlowStateStore struct{} -func (nopFlowStateStore) Save(ctx context.Context, id string, fs common.FlowStater) error { return nil } -func (nopFlowStateStore) Load(ctx context.Context, id string, pfs any) error { return nil } +func (nopFlowStateStore) Save(ctx context.Context, id string, fs base.FlowStater) error { return nil } +func (nopFlowStateStore) Load(ctx context.Context, id string, pfs any) error { return nil } diff --git a/go/genkit/conformance_test.go b/go/genkit/conformance_test.go index 66ba85c48c..245654ba8c 100644 --- a/go/genkit/conformance_test.go +++ b/go/genkit/conformance_test.go @@ -29,7 +29,6 @@ import ( "github.com/firebase/genkit/go/core" "github.com/firebase/genkit/go/internal/base" - "github.com/firebase/genkit/go/internal/common" "github.com/firebase/genkit/go/internal/registry" "golang.org/x/exp/maps" ) @@ -116,7 +115,7 @@ func TestFlowConformance(t *testing.T) { if test.Trace == nil { return } - ts := r.LookupTraceStore(common.EnvironmentDev) + ts := r.LookupTraceStore(registry.EnvironmentDev) var gotTrace any if err := ts.LoadAny(resp.Telemetry.TraceID, &gotTrace); err != nil { t.Fatal(err) diff --git a/go/genkit/flow.go b/go/genkit/flow.go index f7435dce1b..9206dd0a56 100644 --- a/go/genkit/flow.go +++ b/go/genkit/flow.go @@ -30,7 +30,6 @@ import ( "github.com/firebase/genkit/go/core/tracing" "github.com/firebase/genkit/go/internal/atype" "github.com/firebase/genkit/go/internal/base" - "github.com/firebase/genkit/go/internal/common" "github.com/firebase/genkit/go/internal/metrics" "github.com/firebase/genkit/go/internal/registry" "github.com/google/uuid" @@ -107,6 +106,9 @@ type Flow[In, Out, Stream any] struct { type noStream = func(context.Context, struct{}) error +// streamingCallback is the type of streaming callbacks. +type streamingCallback[Stream any] func(context.Context, Stream) error + // DefineFlow creates a Flow that runs fn, and registers it as an action. // // fn takes an input of type In and returns an output of type Out. @@ -152,7 +154,7 @@ func defineFlow[In, Out, Stream any](r *registry.Registry, name string, fn core. } afunc := func(ctx context.Context, inst *flowInstruction[In], cb func(context.Context, Stream) error) (*flowState[In, Out], error) { tracing.SetCustomMetadataAttr(ctx, "flow:wrapperAction", "true") - return f.runInstruction(ctx, inst, common.StreamingCallback[Stream](cb)) + return f.runInstruction(ctx, inst, streamingCallback[Stream](cb)) } core.DefineActionInRegistry(r, "", f.name, atype.Flow, metadata, nil, afunc) f.tstate = r.TracingState() @@ -239,7 +241,7 @@ func newFlowState[In, Out any](id, name string, input In) *flowState[In, Out] { } } -// flowState implements common.FlowStater. +// flowState implements base.FlowStater. func (fs *flowState[In, Out]) IsFlowState() {} func (fs *flowState[In, Out]) ToJSON() ([]byte, error) { @@ -297,7 +299,7 @@ type FlowResult[Out any] struct { // runInstruction performs one of several actions on a flow, as determined by msg. // (Called runEnvelope in the js.) -func (f *Flow[In, Out, Stream]) runInstruction(ctx context.Context, inst *flowInstruction[In], cb common.StreamingCallback[Stream]) (*flowState[In, Out], error) { +func (f *Flow[In, Out, Stream]) runInstruction(ctx context.Context, inst *flowInstruction[In], cb streamingCallback[Stream]) (*flowState[In, Out], error) { switch { case inst.Start != nil: // TODO(jba): pass msg.Start.Labels. @@ -322,7 +324,7 @@ func (f *Flow[In, Out, Stream]) runInstruction(ctx context.Context, inst *flowIn // Name returns the name that the flow was defined with. func (f *Flow[In, Out, Stream]) Name() string { return f.name } -func (f *Flow[In, Out, Stream]) runJSON(ctx context.Context, input json.RawMessage, cb common.StreamingCallback[json.RawMessage]) (json.RawMessage, error) { +func (f *Flow[In, Out, Stream]) runJSON(ctx context.Context, input json.RawMessage, cb streamingCallback[json.RawMessage]) (json.RawMessage, error) { // Validate input before unmarshaling it because invalid or unknown fields will be discarded in the process. if err := base.ValidateJSON(input, f.inputSchema); err != nil { return nil, &base.HTTPError{Code: http.StatusBadRequest, Err: err} @@ -332,7 +334,7 @@ func (f *Flow[In, Out, Stream]) runJSON(ctx context.Context, input json.RawMessa return nil, &base.HTTPError{Code: http.StatusBadRequest, Err: err} } // If there is a callback, wrap it to turn an S into a json.RawMessage. - var callback common.StreamingCallback[Stream] + var callback streamingCallback[Stream] if cb != nil { callback = func(ctx context.Context, s Stream) error { bytes, err := json.Marshal(s) @@ -360,7 +362,7 @@ func (f *Flow[In, Out, Stream]) runJSON(ctx context.Context, input json.RawMessa } // start starts executing the flow with the given input. -func (f *Flow[In, Out, Stream]) start(ctx context.Context, input In, cb common.StreamingCallback[Stream]) (_ *flowState[In, Out], err error) { +func (f *Flow[In, Out, Stream]) start(ctx context.Context, input In, cb streamingCallback[Stream]) (_ *flowState[In, Out], err error) { flowID, err := generateFlowID() if err != nil { return nil, err @@ -377,7 +379,7 @@ func (f *Flow[In, Out, Stream]) start(ctx context.Context, input In, cb common.S // // This function corresponds to Flow.executeSteps in the js, but does more: // it creates the flowContext and saves the state. -func (f *Flow[In, Out, Stream]) execute(ctx context.Context, state *flowState[In, Out], dispatchType string, cb common.StreamingCallback[Stream]) { +func (f *Flow[In, Out, Stream]) execute(ctx context.Context, state *flowState[In, Out], dispatchType string, cb streamingCallback[Stream]) { fctx := newFlowContext(state, f.stateStore, f.tstate) defer func() { if err := fctx.finish(ctx); err != nil { @@ -477,7 +479,7 @@ type flowContext[I, O any] struct { // flowContexter is the type of all flowContext[I, O]. type flowContexter interface { uniqueStepName(string) string - stater() common.FlowStater + stater() base.FlowStater tracingState() *tracing.State } @@ -489,7 +491,7 @@ func newFlowContext[I, O any](state *flowState[I, O], store core.FlowStateStore, seenSteps: map[string]int{}, } } -func (fc *flowContext[I, O]) stater() common.FlowStater { return fc.state } +func (fc *flowContext[I, O]) stater() base.FlowStater { return fc.state } func (fc *flowContext[I, O]) tracingState() *tracing.State { return fc.tstate } // finish is called at the end of a flow execution. diff --git a/go/genkit/genkit.go b/go/genkit/genkit.go index 0eb933d8a9..2588da233b 100644 --- a/go/genkit/genkit.go +++ b/go/genkit/genkit.go @@ -25,7 +25,6 @@ import ( "sync" "syscall" - "github.com/firebase/genkit/go/internal/common" "github.com/firebase/genkit/go/internal/registry" ) @@ -67,7 +66,7 @@ func Init(ctx context.Context, opts *Options) error { var wg sync.WaitGroup errCh := make(chan error, 2) - if common.CurrentEnvironment() == common.EnvironmentDev { + if registry.CurrentEnvironment() == registry.EnvironmentDev { wg.Add(1) go func() { defer wg.Done() diff --git a/go/genkit/servers.go b/go/genkit/servers.go index 02e59b609a..8acf9b2869 100644 --- a/go/genkit/servers.go +++ b/go/genkit/servers.go @@ -41,8 +41,8 @@ import ( "github.com/firebase/genkit/go/core/logger" "github.com/firebase/genkit/go/core/tracing" + "github.com/firebase/genkit/go/internal/action" "github.com/firebase/genkit/go/internal/base" - "github.com/firebase/genkit/go/internal/common" "github.com/firebase/genkit/go/internal/registry" "go.opentelemetry.io/otel/trace" ) @@ -76,7 +76,7 @@ type flow interface { // runJSON uses encoding/json to unmarshal the input, // calls Flow.start, then returns the marshaled result. - runJSON(ctx context.Context, input json.RawMessage, cb common.StreamingCallback[json.RawMessage]) (json.RawMessage, error) + runJSON(ctx context.Context, input json.RawMessage, cb streamingCallback[json.RawMessage]) (json.RawMessage, error) } // startServer starts an HTTP server listening on the address. @@ -174,7 +174,7 @@ func (s *devServer) handleRunAction(w http.ResponseWriter, r *http.Request) erro logger.FromContext(ctx).Debug("running action", "key", body.Key, "stream", stream) - var callback common.StreamingCallback[json.RawMessage] + var callback streamingCallback[json.RawMessage] if stream { // Stream results are newline-separated JSON. callback = func(ctx context.Context, msg json.RawMessage) error { @@ -204,7 +204,7 @@ type telemetry struct { TraceID string `json:"traceId"` } -func runAction(ctx context.Context, reg *registry.Registry, key string, input json.RawMessage, cb common.StreamingCallback[json.RawMessage]) (*runActionResponse, error) { +func runAction(ctx context.Context, reg *registry.Registry, key string, input json.RawMessage, cb streamingCallback[json.RawMessage]) (*runActionResponse, error) { action := reg.LookupAction(key) if action == nil { return nil, &base.HTTPError{Code: http.StatusNotFound, Err: fmt.Errorf("no action with key %q", key)} @@ -227,7 +227,7 @@ func runAction(ctx context.Context, reg *registry.Registry, key string, input js // handleListActions lists all the registered actions. func (s *devServer) handleListActions(w http.ResponseWriter, r *http.Request) error { descs := s.reg.ListActions() - descMap := map[string]common.ActionDesc{} + descMap := map[string]action.Desc{} for _, d := range descs { descMap[d.Key] = d } @@ -237,7 +237,7 @@ func (s *devServer) handleListActions(w http.ResponseWriter, r *http.Request) er // handleGetTrace returns a single trace from a TraceStore. func (s *devServer) handleGetTrace(w http.ResponseWriter, r *http.Request) error { env := r.PathValue("env") - ts := s.reg.LookupTraceStore(common.Environment(env)) + ts := s.reg.LookupTraceStore(registry.Environment(env)) if ts == nil { return &base.HTTPError{Code: http.StatusNotFound, Err: fmt.Errorf("no TraceStore for environment %q", env)} } @@ -255,7 +255,7 @@ func (s *devServer) handleGetTrace(w http.ResponseWriter, r *http.Request) error // handleListTraces returns a list of traces from a TraceStore. func (s *devServer) handleListTraces(w http.ResponseWriter, r *http.Request) error { env := r.PathValue("env") - ts := s.reg.LookupTraceStore(common.Environment(env)) + ts := s.reg.LookupTraceStore(registry.Environment(env)) if ts == nil { return &base.HTTPError{Code: http.StatusNotFound, Err: fmt.Errorf("no TraceStore for environment %q", env)} } @@ -287,12 +287,12 @@ type listTracesResult struct { } func (s *devServer) handleListFlowStates(w http.ResponseWriter, r *http.Request) error { - return writeJSON(r.Context(), w, listFlowStatesResult{[]common.FlowStater{}, ""}) + return writeJSON(r.Context(), w, listFlowStatesResult{[]base.FlowStater{}, ""}) } type listFlowStatesResult struct { - FlowStates []common.FlowStater `json:"flowStates"` - ContinuationToken string `json:"continuationToken"` + FlowStates []base.FlowStater `json:"flowStates"` + ContinuationToken string `json:"continuationToken"` } // NewFlowServeMux constructs a [net/http.ServeMux]. diff --git a/go/genkit/servers_test.go b/go/genkit/servers_test.go index 59a29ceba6..e953692520 100644 --- a/go/genkit/servers_test.go +++ b/go/genkit/servers_test.go @@ -25,8 +25,8 @@ import ( "github.com/firebase/genkit/go/core" "github.com/firebase/genkit/go/core/tracing" + "github.com/firebase/genkit/go/internal/action" "github.com/firebase/genkit/go/internal/atype" - "github.com/firebase/genkit/go/internal/common" "github.com/firebase/genkit/go/internal/registry" "github.com/google/go-cmp/cmp" "github.com/google/go-cmp/cmp/cmpopts" @@ -87,11 +87,11 @@ func TestDevServer(t *testing.T) { if res.StatusCode != 200 { t.Fatalf("got status %d, wanted 200", res.StatusCode) } - got, err := readJSON[map[string]common.ActionDesc](res.Body) + got, err := readJSON[map[string]action.Desc](res.Body) if err != nil { t.Fatal(err) } - want := map[string]common.ActionDesc{ + want := map[string]action.Desc{ "/custom/devServer/inc": { Key: "/custom/devServer/inc", Name: "devServer/inc", @@ -168,7 +168,7 @@ func TestProdServer(t *testing.T) { } func checkActionTrace(t *testing.T, reg *registry.Registry, tid, name string) { - ts := reg.LookupTraceStore(common.EnvironmentDev) + ts := reg.LookupTraceStore(registry.EnvironmentDev) td, err := ts.Load(context.Background(), tid) if err != nil { t.Fatal(err) diff --git a/go/internal/common/common.go b/go/internal/action/action.go similarity index 64% rename from go/internal/common/common.go rename to go/internal/action/action.go index 7feacf9b83..ddd0119b5b 100644 --- a/go/internal/common/common.go +++ b/go/internal/action/action.go @@ -12,12 +12,11 @@ // See the License for the specific language governing permissions and // limitations under the License. -package common +package action import ( "context" "encoding/json" - "os" "github.com/firebase/genkit/go/core/tracing" "github.com/invopop/jsonschema" @@ -34,15 +33,15 @@ type Action interface { // Desc returns a description of the action. // It should set all fields of actionDesc except Key, which // the registry will set. - Desc() ActionDesc + Desc() Desc // SetTracingState set's the action's tracing.State. SetTracingState(*tracing.State) } -// An ActionDesc is a description of an Action. +// A Desc is a description of an Action. // It is used to provide a list of registered actions. -type ActionDesc struct { +type Desc struct { Key string `json:"key"` // full key from the registry Name string `json:"name"` Description string `json:"description"` @@ -50,30 +49,3 @@ type ActionDesc struct { InputSchema *jsonschema.Schema `json:"inputSchema"` OutputSchema *jsonschema.Schema `json:"outputSchema"` } - -// An Environment is the execution context in which the program is running. -type Environment string - -const ( - EnvironmentDev Environment = "dev" // development: testing, debugging, etc. - EnvironmentProd Environment = "prod" // production: user data, SLOs, etc. -) - -// CurentEnvironment returns the currently active environment. -func CurrentEnvironment() Environment { - if v := os.Getenv("GENKIT_ENV"); v != "" { - return Environment(v) - } - return EnvironmentProd -} - -// FlowStater is the common type of all flowState[I, O] types. -type FlowStater interface { - IsFlowState() - ToJSON() ([]byte, error) - CacheAt(key string) json.RawMessage - CacheSet(key string, val json.RawMessage) -} - -// StreamingCallback is the type of streaming callbacks. -type StreamingCallback[Stream any] func(context.Context, Stream) error diff --git a/go/internal/base/misc.go b/go/internal/base/misc.go index 52309c47c9..01b0fa8404 100644 --- a/go/internal/base/misc.go +++ b/go/internal/base/misc.go @@ -15,6 +15,7 @@ package base import ( + "encoding/json" "fmt" "net/http" "net/url" @@ -48,3 +49,11 @@ type HTTPError struct { func (e *HTTPError) Error() string { return fmt.Sprintf("%s: %s", http.StatusText(e.Code), e.Err) } + +// FlowStater is the common type of all flowState[I, O] types. +type FlowStater interface { + IsFlowState() + ToJSON() ([]byte, error) + CacheAt(key string) json.RawMessage + CacheSet(key string, val json.RawMessage) +} diff --git a/go/internal/registry/registry.go b/go/internal/registry/registry.go index e338124045..c57fb73319 100644 --- a/go/internal/registry/registry.go +++ b/go/internal/registry/registry.go @@ -25,8 +25,8 @@ import ( "sync" "github.com/firebase/genkit/go/core/tracing" + "github.com/firebase/genkit/go/internal/action" "github.com/firebase/genkit/go/internal/atype" - "github.com/firebase/genkit/go/internal/common" sdktrace "go.opentelemetry.io/otel/sdk/trace" "golang.org/x/exp/maps" ) @@ -50,24 +50,24 @@ type Registry struct { tstate *tracing.State mu sync.Mutex frozen bool // when true, no more additions - actions map[string]common.Action + actions map[string]action.Action flows []Flow - // TraceStores, at most one for each [common.Environment]. + // TraceStores, at most one for each [Environment]. // Only the prod trace store is actually registered; the dev one is // always created automatically. But it's simpler if we keep them together here. - traceStores map[common.Environment]tracing.Store + traceStores map[Environment]tracing.Store } func New() (*Registry, error) { r := &Registry{ - actions: map[string]common.Action{}, - traceStores: map[common.Environment]tracing.Store{}, + actions: map[string]action.Action{}, + traceStores: map[Environment]tracing.Store{}, } tstore, err := newDevStore() if err != nil { return nil, err } - r.RegisterTraceStore(common.EnvironmentDev, tstore) + r.RegisterTraceStore(EnvironmentDev, tstore) r.tstate = tracing.NewState() r.tstate.AddTraceStoreImmediate(tstore) return r, nil @@ -89,7 +89,7 @@ func newDevStore() (tracing.Store, error) { // RegisterAction records the action in the registry. // It panics if an action with the same type, provider and name is already // registered. -func (r *Registry) RegisterAction(typ atype.ActionType, a common.Action) { +func (r *Registry) RegisterAction(typ atype.ActionType, a action.Action) { key := fmt.Sprintf("/%s/%s", typ, a.Name()) r.mu.Lock() defer r.mu.Unlock() @@ -113,7 +113,7 @@ func (r *Registry) Freeze() { } // LookupAction returns the action for the given key, or nil if there is none. -func (r *Registry) LookupAction(key string) common.Action { +func (r *Registry) LookupAction(key string) action.Action { r.mu.Lock() defer r.mu.Unlock() return r.actions[key] @@ -121,8 +121,8 @@ func (r *Registry) LookupAction(key string) common.Action { // ListActions returns a list of descriptions of all registered actions. // The list is sorted by action name. -func (r *Registry) ListActions() []common.ActionDesc { - var ads []common.ActionDesc +func (r *Registry) ListActions() []action.Desc { + var ads []action.Desc r.mu.Lock() defer r.mu.Unlock() keys := maps.Keys(r.actions) @@ -155,7 +155,7 @@ func (r *Registry) ListFlows() []Flow { return r.flows } -func (r *Registry) RegisterTraceStore(env common.Environment, ts tracing.Store) { +func (r *Registry) RegisterTraceStore(env Environment, ts tracing.Store) { r.mu.Lock() defer r.mu.Unlock() if _, ok := r.traceStores[env]; ok { @@ -164,7 +164,7 @@ func (r *Registry) RegisterTraceStore(env common.Environment, ts tracing.Store) r.traceStores[env] = ts } -func (r *Registry) LookupTraceStore(env common.Environment) tracing.Store { +func (r *Registry) LookupTraceStore(env Environment) tracing.Store { r.mu.Lock() defer r.mu.Unlock() return r.traceStores[env] @@ -173,3 +173,19 @@ func (r *Registry) LookupTraceStore(env common.Environment) tracing.Store { func (r *Registry) RegisterSpanProcessor(sp sdktrace.SpanProcessor) { r.tstate.RegisterSpanProcessor(sp) } + +// An Environment is the execution context in which the program is running. +type Environment string + +const ( + EnvironmentDev Environment = "dev" // development: testing, debugging, etc. + EnvironmentProd Environment = "prod" // production: user data, SLOs, etc. +) + +// CurentEnvironment returns the currently active environment. +func CurrentEnvironment() Environment { + if v := os.Getenv("GENKIT_ENV"); v != "" { + return Environment(v) + } + return EnvironmentProd +}