Skip to content

Commit a6db262

Browse files
authored
Merge pull request #91 from doringeman/openai-recorder
Add OpenAIRecorder
2 parents 5d31399 + 3a55005 commit a6db262

File tree

5 files changed

+313
-14
lines changed

5 files changed

+313
-14
lines changed

pkg/inference/backend.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,8 @@ func (m BackendMode) String() string {
3030
}
3131

3232
type BackendConfiguration struct {
33-
ContextSize int64
34-
RawFlags []string
33+
ContextSize int64 `json:"context_size,omitempty"`
34+
RawFlags []string `json:"flags,omitempty"`
3535
}
3636

3737
// Backend is the interface implemented by inference engine backends. Backend

pkg/inference/scheduling/loader.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import (
1313
"github.com/docker/model-runner/pkg/inference"
1414
"github.com/docker/model-runner/pkg/inference/models"
1515
"github.com/docker/model-runner/pkg/logging"
16+
"github.com/docker/model-runner/pkg/metrics"
1617
)
1718

1819
const (
@@ -92,13 +93,16 @@ type loader struct {
9293
timestamps []time.Time
9394
// runnerConfigs maps model names to runner configurations
9495
runnerConfigs map[runnerKey]inference.BackendConfiguration
96+
// openAIRecorder is used to record OpenAI API inference requests and responses.
97+
openAIRecorder *metrics.OpenAIRecorder
9598
}
9699

97100
// newLoader creates a new loader.
98101
func newLoader(
99102
log logging.Logger,
100103
backends map[string]inference.Backend,
101104
modelManager *models.Manager,
105+
openAIRecorder *metrics.OpenAIRecorder,
102106
) *loader {
103107
// Compute the number of runner slots to allocate. Because of RAM and VRAM
104108
// limitations, it's unlikely that we'll ever be able to fully populate
@@ -153,6 +157,7 @@ func newLoader(
153157
allocations: make([]uint64, nSlots),
154158
timestamps: make([]time.Time, nSlots),
155159
runnerConfigs: make(map[runnerKey]inference.BackendConfiguration),
160+
openAIRecorder: openAIRecorder,
156161
}
157162
l.guard <- struct{}{}
158163
return l
@@ -462,7 +467,7 @@ func (l *loader) load(ctx context.Context, backendName, model string, mode infer
462467
}
463468
// Create the runner.
464469
l.log.Infof("Loading %s backend runner with model %s in %s mode", backendName, model, mode)
465-
runner, err := run(l.log, backend, model, mode, slot, runnerConfig)
470+
runner, err := run(l.log, backend, model, mode, slot, runnerConfig, l.openAIRecorder)
466471
if err != nil {
467472
l.log.Warnf("Unable to start %s backend runner with model %s in %s mode: %v",
468473
backendName, model, mode, err,

pkg/inference/scheduling/runner.go

Lines changed: 19 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ import (
1515

1616
"github.com/docker/model-runner/pkg/inference"
1717
"github.com/docker/model-runner/pkg/logging"
18+
"github.com/docker/model-runner/pkg/metrics"
1819
)
1920

2021
const (
@@ -63,6 +64,8 @@ type runner struct {
6364
proxy *httputil.ReverseProxy
6465
// proxyLog is the stream used for logging by proxy.
6566
proxyLog io.Closer
67+
// openAIRecorder is used to record OpenAI API inference requests and responses.
68+
openAIRecorder *metrics.OpenAIRecorder
6669
// err is the error returned by the runner's backend, only valid after done is closed.
6770
err error
6871
}
@@ -75,6 +78,7 @@ func run(
7578
mode inference.BackendMode,
7679
slot int,
7780
runnerConfig *inference.BackendConfiguration,
81+
openAIRecorder *metrics.OpenAIRecorder,
7882
) (*runner, error) {
7983
// Create a dialer / transport that target backend on the specified slot.
8084
socket, err := RunnerSocketPath(slot)
@@ -124,16 +128,17 @@ func run(
124128
runDone := make(chan struct{})
125129

126130
r := &runner{
127-
log: log,
128-
backend: backend,
129-
model: model,
130-
mode: mode,
131-
cancel: runCancel,
132-
done: runDone,
133-
transport: transport,
134-
client: client,
135-
proxy: proxy,
136-
proxyLog: proxyLog,
131+
log: log,
132+
backend: backend,
133+
model: model,
134+
mode: mode,
135+
cancel: runCancel,
136+
done: runDone,
137+
transport: transport,
138+
client: client,
139+
proxy: proxy,
140+
proxyLog: proxyLog,
141+
openAIRecorder: openAIRecorder,
137142
}
138143

139144
proxy.ErrorHandler = func(w http.ResponseWriter, req *http.Request, err error) {
@@ -164,6 +169,8 @@ func run(
164169
}
165170
}
166171

172+
r.openAIRecorder.SetConfigForModel(model, runnerConfig)
173+
167174
// Start the backend run loop.
168175
go func() {
169176
if err := backend.Run(runCtx, socket, model, mode, runnerConfig); err != nil {
@@ -236,6 +243,8 @@ func (r *runner) terminate() {
236243
if err := r.proxyLog.Close(); err != nil {
237244
r.log.Warnf("Unable to close reverse proxy log writer: %v", err)
238245
}
246+
247+
r.openAIRecorder.RemoveModel(r.model)
239248
}
240249

241250
// ServeHTTP implements net/http.Handler.ServeHTTP. It forwards requests to the

pkg/inference/scheduling/scheduler.go

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,8 @@ type Scheduler struct {
4040
router *http.ServeMux
4141
// tracker is the metrics tracker.
4242
tracker *metrics.Tracker
43+
// openAIRecorder is used to record OpenAI API inference requests and responses.
44+
openAIRecorder *metrics.OpenAIRecorder
4345
// lock is used to synchronize access to the scheduler's router.
4446
lock sync.Mutex
4547
}
@@ -54,16 +56,19 @@ func NewScheduler(
5456
allowedOrigins []string,
5557
tracker *metrics.Tracker,
5658
) *Scheduler {
59+
openAIRecorder := metrics.NewOpenAIRecorder(log.WithField("component", "openai-recorder"))
60+
5761
// Create the scheduler.
5862
s := &Scheduler{
5963
log: log,
6064
backends: backends,
6165
defaultBackend: defaultBackend,
6266
modelManager: modelManager,
6367
installer: newInstaller(log, backends, httpClient),
64-
loader: newLoader(log, backends, modelManager),
68+
loader: newLoader(log, backends, modelManager, openAIRecorder),
6569
router: http.NewServeMux(),
6670
tracker: tracker,
71+
openAIRecorder: openAIRecorder,
6772
}
6873

6974
// Register routes.
@@ -115,6 +120,7 @@ func (s *Scheduler) routeHandlers(allowedOrigins []string) map[string]http.Handl
115120
m["POST "+inference.InferencePrefix+"/unload"] = s.Unload
116121
m["POST "+inference.InferencePrefix+"/{backend}/_configure"] = s.Configure
117122
m["POST "+inference.InferencePrefix+"/_configure"] = s.Configure
123+
m["GET "+inference.InferencePrefix+"/requests"] = s.openAIRecorder.GetRecordsByModelHandler()
118124
return m
119125
}
120126

@@ -232,6 +238,14 @@ func (s *Scheduler) handleOpenAIInference(w http.ResponseWriter, r *http.Request
232238
s.tracker.TrackModel(model)
233239
}
234240

241+
// Record the request in the OpenAI recorder.
242+
recordID := s.openAIRecorder.RecordRequest(request.Model, r, body)
243+
w = s.openAIRecorder.NewResponseRecorder(w)
244+
defer func() {
245+
// Record the response in the OpenAI recorder.
246+
s.openAIRecorder.RecordResponse(recordID, request.Model, w)
247+
}()
248+
235249
// Request a runner to execute the request and defer its release.
236250
runner, err := s.loader.load(r.Context(), backend.Name(), request.Model, backendMode)
237251
if err != nil {

0 commit comments

Comments
 (0)