Skip to content

Commit e462533

Browse files
committed
chore: warning
1 parent 1514397 commit e462533

File tree

3 files changed

+148
-1
lines changed

3 files changed

+148
-1
lines changed

cli/server.go

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1157,6 +1157,24 @@ func (r *RootCmd) Server(newAPI func(context.Context, *coderd.Options) (*coderd.
11571157
cliui.Errorf(inv.Stderr, "Notify systemd failed: %s", err)
11581158
}
11591159

1160+
// Stop accepting new connections to aibridged.
1161+
//
1162+
// When running as an in-memory daemon, the HTTP handler is wired into the
1163+
// coderd API and therefore is subject to its context. Calling shutdown on
1164+
// aibridged will NOT affect in-flight requests but those will be closed once
1165+
// the API server is shutdown below.
1166+
if current := coderAPI.AIBridgeDaemon.Load(); current != nil {
1167+
cliui.Info(inv.Stdout, "Shutting down aibridge daemon...\n")
1168+
1169+
err = shutdownWithTimeout((*current).Shutdown, 5*time.Second)
1170+
if err != nil {
1171+
cliui.Errorf(inv.Stderr, "Graceful shutdown of aibridge daemon failed: %s\n", err)
1172+
} else {
1173+
_ = (*current).Close()
1174+
cliui.Info(inv.Stdout, "Gracefully shut down aibridge daemon\n")
1175+
}
1176+
}
1177+
11601178
// Stop accepting new connections without interrupting
11611179
// in-flight requests, give in-flight requests 5 seconds to
11621180
// complete.

coderd/coderd.go

Lines changed: 82 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@ import (
2020
"sync/atomic"
2121
"time"
2222

23+
"github.com/coder/coder/v2/aibridged"
24+
aibridgedproto "github.com/coder/coder/v2/aibridged/proto"
2325
"github.com/coder/coder/v2/coderd/oauth2provider"
2426
"github.com/coder/coder/v2/coderd/pproflabel"
2527
"github.com/coder/coder/v2/coderd/prebuilds"
@@ -44,6 +46,9 @@ import (
4446
"tailscale.com/types/key"
4547
"tailscale.com/util/singleflight"
4648

49+
"github.com/coder/coder/v2/coderd/aibridgedserver"
50+
"github.com/coder/coder/v2/provisionerd/proto"
51+
4752
"cdr.dev/slog"
4853
"github.com/coder/quartz"
4954
"github.com/coder/serpent"
@@ -95,7 +100,6 @@ import (
95100
"github.com/coder/coder/v2/coderd/workspacestats"
96101
"github.com/coder/coder/v2/codersdk"
97102
"github.com/coder/coder/v2/codersdk/healthsdk"
98-
"github.com/coder/coder/v2/provisionerd/proto"
99103
"github.com/coder/coder/v2/provisionersdk"
100104
"github.com/coder/coder/v2/site"
101105
"github.com/coder/coder/v2/tailnet"
@@ -632,6 +636,7 @@ func New(options *Options) *API {
632636
api.PortSharer.Store(&portsharing.DefaultPortSharer)
633637
api.PrebuildsClaimer.Store(&prebuilds.DefaultClaimer)
634638
api.PrebuildsReconciler.Store(&prebuilds.DefaultReconciler)
639+
api.AIBridgeDaemon.Store(&aibridged.DefaultServer)
635640
buildInfo := codersdk.BuildInfoResponse{
636641
ExternalURL: buildinfo.ExternalURL(),
637642
Version: buildinfo.Version(),
@@ -1767,6 +1772,8 @@ type API struct {
17671772
// dbRolluper rolls up template usage stats from raw agent and app
17681773
// stats. This is used to provide insights in the WebUI.
17691774
dbRolluper *dbrollup.Rolluper
1775+
1776+
AIBridgeDaemon atomic.Pointer[aibridged.Server]
17701777
}
17711778

17721779
// Close waits for all WebSocket connections to drain before returning.
@@ -1825,6 +1832,10 @@ func (api *API) Close() error {
18251832
(*current).Stop(ctx, nil)
18261833
}
18271834

1835+
if current := api.AIBridgeDaemon.Load(); current != nil {
1836+
_ = (*current).Close()
1837+
}
1838+
18281839
return nil
18291840
}
18301841

@@ -1998,6 +2009,76 @@ func (api *API) CreateInMemoryTaggedProvisionerDaemon(dialCtx context.Context, n
19982009
return proto.NewDRPCProvisionerDaemonClient(clientSession), nil
19992010
}
20002011

2012+
func (api *API) CreateInMemoryAIBridgeDaemon(dialCtx context.Context) (client aibridged.DRPCClient, err error) {
2013+
// TODO(dannyk): implement options.
2014+
// TODO(dannyk): implement tracing.
2015+
// TODO(dannyk): implement API versioning.
2016+
2017+
clientSession, serverSession := drpcsdk.MemTransportPipe()
2018+
defer func() {
2019+
if err != nil {
2020+
_ = clientSession.Close()
2021+
_ = serverSession.Close()
2022+
}
2023+
}()
2024+
2025+
mux := drpcmux.New()
2026+
api.Logger.Debug(dialCtx, "starting in-memory aibridge daemon")
2027+
logger := api.Logger.Named("inmem-aibridged")
2028+
srv, err := aibridgedserver.NewServer(api.ctx, api.Database, logger,
2029+
api.DeploymentValues.AccessURL.String(), api.ExternalAuthConfigs)
2030+
if err != nil {
2031+
return nil, err
2032+
}
2033+
err = aibridgedproto.DRPCRegisterRecorder(mux, srv)
2034+
if err != nil {
2035+
return nil, xerrors.Errorf("register recorder service: %w", err)
2036+
}
2037+
err = aibridgedproto.DRPCRegisterMCPConfigurator(mux, srv)
2038+
if err != nil {
2039+
return nil, xerrors.Errorf("register MCP configurator service: %w", err)
2040+
}
2041+
err = aibridgedproto.DRPCRegisterAuthenticator(mux, srv)
2042+
if err != nil {
2043+
return nil, xerrors.Errorf("register authenticator service: %w", err)
2044+
}
2045+
server := drpcserver.NewWithOptions(&tracing.DRPCHandler{Handler: mux},
2046+
drpcserver.Options{
2047+
Manager: drpcsdk.DefaultDRPCOptions(nil),
2048+
Log: func(err error) {
2049+
if xerrors.Is(err, io.EOF) {
2050+
return
2051+
}
2052+
logger.Debug(dialCtx, "drpc server error", slog.Error(err))
2053+
},
2054+
},
2055+
)
2056+
// in-mem pipes aren't technically "websockets" but they have the same properties as far as the
2057+
// API is concerned: they are long-lived connections that we need to close before completing
2058+
// shutdown of the API.
2059+
api.WebsocketWaitMutex.Lock()
2060+
api.WebsocketWaitGroup.Add(1)
2061+
api.WebsocketWaitMutex.Unlock()
2062+
go func() {
2063+
defer api.WebsocketWaitGroup.Done()
2064+
// Here we pass the background context, since we want the server to keep serving until the
2065+
// client hangs up. The aibridged is local, in-mem, so there isn't a danger of losing contact with it and
2066+
// having a dead connection we don't know the status of.
2067+
err := server.Serve(context.Background(), serverSession)
2068+
logger.Info(dialCtx, "aibridge daemon disconnected", slog.Error(err))
2069+
// Close the sessions, so we don't leak goroutines serving them.
2070+
_ = clientSession.Close()
2071+
_ = serverSession.Close()
2072+
}()
2073+
2074+
return &aibridged.Client{
2075+
Conn: clientSession,
2076+
DRPCRecorderClient: aibridgedproto.NewDRPCRecorderClient(clientSession),
2077+
DRPCMCPConfiguratorClient: aibridgedproto.NewDRPCMCPConfiguratorClient(clientSession),
2078+
DRPCAuthenticatorClient: aibridgedproto.NewDRPCAuthenticatorClient(clientSession),
2079+
}, nil
2080+
}
2081+
20012082
func (api *API) DERPMap() *tailcfg.DERPMap {
20022083
fn := api.DERPMapper.Load()
20032084
if fn != nil {

enterprise/coderd/coderd.go

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import (
1313
"sync/atomic"
1414
"time"
1515

16+
"github.com/coder/coder/v2/aibridged"
1617
"github.com/coder/coder/v2/buildinfo"
1718
"github.com/coder/coder/v2/coderd/appearance"
1819
"github.com/coder/coder/v2/coderd/database"
@@ -61,6 +62,8 @@ import (
6162
"github.com/coder/coder/v2/enterprise/tailnet"
6263
"github.com/coder/coder/v2/provisionerd/proto"
6364
agpltailnet "github.com/coder/coder/v2/tailnet"
65+
66+
"github.com/coder/aibridge"
6467
)
6568

6669
// New constructs an Enterprise coderd API instance.
@@ -617,6 +620,23 @@ func New(ctx context.Context, options *Options) (_ *API, err error) {
617620
return nil, xerrors.Errorf("unable to register license metrics collector")
618621
}
619622

623+
// In-memory aibridge daemons.
624+
if api.DeploymentValues.AI.BridgeConfig.Enabled {
625+
if api.AGPL.Experiments.Enabled(codersdk.ExperimentAIBridge) {
626+
srv, err := newAIBridgeServer(api.AGPL)
627+
if err != nil {
628+
return nil, xerrors.Errorf("create aibridged: %w", err)
629+
}
630+
api.AGPL.AIBridgeDaemon.Store(&srv)
631+
} else {
632+
api.Logger.Warn(ctx, fmt.Sprintf("CODER_AIBRIDGE_ENABLED=true but experiment %q not enabled", codersdk.ExperimentAIBridge))
633+
}
634+
} else {
635+
if api.AGPL.Experiments.Enabled(codersdk.ExperimentAIBridge) {
636+
api.Logger.Warn(ctx, "aibridge experiment enabled but CODER_AIBRIDGE_ENABLED=false")
637+
}
638+
}
639+
620640
err = api.updateEntitlements(ctx)
621641
if err != nil {
622642
return nil, xerrors.Errorf("update entitlements: %w", err)
@@ -1275,3 +1295,31 @@ func (api *API) setupPrebuilds(featureEnabled bool) (agplprebuilds.Reconciliatio
12751295
api.Logger.Named("prebuilds"), quartz.NewReal(), api.PrometheusRegistry, api.NotificationsEnqueuer, api.AGPL.BuildUsageChecker)
12761296
return reconciler, prebuilds.NewEnterpriseClaimer(api.Database)
12771297
}
1298+
1299+
func newAIBridgeServer(coderAPI *coderd.API) (aibridged.Server, error) {
1300+
srv, err := aibridged.New(
1301+
func(dialCtx context.Context) (aibridged.DRPCClient, error) {
1302+
return coderAPI.CreateInMemoryAIBridgeDaemon(dialCtx)
1303+
},
1304+
convertAIBridgeDeploymentValues(coderAPI.DeploymentValues.AI.BridgeConfig),
1305+
coderAPI.Logger.Named("aibridged"),
1306+
)
1307+
if err != nil {
1308+
return nil, xerrors.Errorf("create aibridge daemon: %w", err)
1309+
}
1310+
return srv, nil
1311+
}
1312+
1313+
func convertAIBridgeDeploymentValues(vals codersdk.AIBridgeConfig) aibridge.Config {
1314+
return aibridge.Config{
1315+
OpenAI: aibridge.ProviderConfig{
1316+
BaseURL: vals.OpenAI.BaseURL.String(),
1317+
Key: vals.OpenAI.Key.String(),
1318+
},
1319+
Anthropic: aibridge.ProviderConfig{
1320+
BaseURL: vals.Anthropic.BaseURL.String(),
1321+
Key: vals.Anthropic.Key.String(),
1322+
},
1323+
CacheSize: 100, // TODO: configurable.
1324+
}
1325+
}

0 commit comments

Comments
 (0)