Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions .github/workflows/scorecard.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,11 @@ on:
- cron: "0 6 * * 1"
workflow_dispatch:

# Restrict the default GITHUB_TOKEN to read-only; the steps below request the
# narrow scopes they actually need.
permissions: read-all
# Workflow-level default: minimum needed for actions/checkout. The analysis
# job declares the additional narrow scopes (security-events: write, id-token:
# write, actions: read) it actually needs.
permissions:
contents: read

jobs:
analysis:
Expand Down
6 changes: 5 additions & 1 deletion .github/workflows/security.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,11 @@ on:
schedule:
- cron: '21 4 * * 1' # Mondays 04:21 UTC β€” catch newly-disclosed CVEs

permissions: read-all
# Workflow-level default: minimum needed for actions/checkout. Each job
# declares the additional narrow scopes (security-events: write for SARIF
# uploaders, contents: write for SBOM artifact upload) it actually needs.
permissions:
contents: read

jobs:
osv-scanner:
Expand Down
68 changes: 33 additions & 35 deletions internal/ingest/pipeline_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,8 +239,11 @@ func TestPipeline_PreservesInsertionOrder(t *testing.T) {
if err := p.Submit(healthyBatch()); err != nil {
t.Fatalf("submit: %v", err)
}
if !waitFor(t, 2*time.Second, func() bool { return p.Stats().Processed == 1 }) {
t.Fatalf("worker did not process batch within deadline")
// Sync on the assertion target β€” the per-signal call sequence β€” rather
// than Stats().Processed, which can bump between BatchCreate calls under
// the race detector and trip the length check on a partial slice.
if !waitFor(t, 5*time.Second, func() bool { return len(w.snapshotOrder()) >= 3 }) {
t.Fatalf("worker did not record 3 calls within deadline (got %v)", w.snapshotOrder())
}

got := w.snapshotOrder()
Expand Down Expand Up @@ -273,17 +276,18 @@ func TestPipeline_CallbacksFireAfterPersistence(t *testing.T) {
if err := p.Submit(b); err != nil {
t.Fatalf("submit: %v", err)
}
if !waitFor(t, 2*time.Second, func() bool { return spanHits.Load() == 1 && logHits.Load() == 1 }) {
if !waitFor(t, 5*time.Second, func() bool { return spanHits.Load() == 1 && logHits.Load() == 1 }) {
t.Fatalf("callbacks did not fire (span=%d log=%d, want 1/1)", spanHits.Load(), logHits.Load())
}
}

func TestPipeline_FailedSpansSkipsLogs(t *testing.T) {
// When BatchCreateSpans fails, BatchCreateLogs must NOT run for that
// batch β€” preserves the invariant that orphan logs aren't persisted
// without their span. Mirrors the synchronous path's behavior of
// returning the span error before log insert.
w := &fakeWriter{spanErr: errors.New("span db down")}
// runFailureSkipsCheck wires up a 1-worker pipeline with the configured
// fakeWriter, submits a healthy batch, waits for the failure to surface,
// then asserts that none of the forbidden BatchCreate* calls fired.
// Shared by the trace-fails and span-fails skip tests so the boilerplate
// (pipeline lifecycle + waitFor) lives in one place.
func runFailureSkipsCheck(t *testing.T, w *fakeWriter, forbidden ...string) {
t.Helper()
p := NewPipeline(w, nil, PipelineConfig{Capacity: 2, Workers: 1})
ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel)
Expand All @@ -293,41 +297,33 @@ func TestPipeline_FailedSpansSkipsLogs(t *testing.T) {
if err := p.Submit(healthyBatch()); err != nil {
t.Fatalf("submit: %v", err)
}
if !waitFor(t, 2*time.Second, func() bool { return p.Stats().ProcessFailures > 0 }) {
if !waitFor(t, 5*time.Second, func() bool { return p.Stats().ProcessFailures > 0 }) {
t.Fatalf("expected ProcessFailures > 0, got %d", p.Stats().ProcessFailures)
}
calls := w.snapshotOrder()
for _, c := range calls {
if c == "logs" {
t.Fatalf("BatchCreateLogs ran after spans failed β€” order=%v", calls)
for _, f := range forbidden {
if c == f {
t.Fatalf("%s ran after upstream failure β€” order=%v", f, calls)
}
}
}
}

func TestPipeline_FailedSpansSkipsLogs(t *testing.T) {
// When BatchCreateSpans fails, BatchCreateLogs must NOT run for that
// batch β€” preserves the invariant that orphan logs aren't persisted
// without their span. Mirrors the synchronous path's behavior of
// returning the span error before log insert.
runFailureSkipsCheck(t, &fakeWriter{spanErr: errors.New("span db down")}, "logs")
}

func TestPipeline_FailedTracesAbortsBatch(t *testing.T) {
// Trace failures roll the entire batch back β€” atomic batches are the
// fix for orphan FK rows when a worker crashes between BatchCreate*
// calls. Spans and logs must NOT be persisted when the trace insert
// fails. Counterpart of TestPipeline_FailedSpansSkipsLogs.
w := &fakeWriter{traceErr: errors.New("transient")}
p := NewPipeline(w, nil, PipelineConfig{Capacity: 2, Workers: 1})
ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel)
p.Start(ctx)
t.Cleanup(p.Stop)

if err := p.Submit(healthyBatch()); err != nil {
t.Fatalf("submit: %v", err)
}
if !waitFor(t, 2*time.Second, func() bool { return p.Stats().ProcessFailures > 0 }) {
t.Fatalf("expected ProcessFailures > 0, got %d", p.Stats().ProcessFailures)
}
calls := w.snapshotOrder()
for _, c := range calls {
if c == "spans" || c == "logs" {
t.Fatalf("spans/logs ran after trace failure β€” order=%v", calls)
}
}
runFailureSkipsCheck(t, &fakeWriter{traceErr: errors.New("transient")}, "spans", "logs")
}

func TestPipeline_DrainsOnStop(t *testing.T) {
Expand Down Expand Up @@ -493,15 +489,17 @@ func TestPipeline_PerTenantCap_ReleasedAfterProcess(t *testing.T) {
if err := p.Submit(mk()); err != nil {
t.Fatalf("submit 1: %v", err)
}
// Wait for the worker to drain it (and release the slot).
if !waitFor(t, 2*time.Second, func() bool { return p.Stats().Processed == 1 }) {
// Wait for the worker to drain it (and release the slot). 5s tolerates
// the race detector's overhead on slow CI runners β€” the test passes
// locally in milliseconds.
if !waitFor(t, 5*time.Second, func() bool { return p.Stats().Processed == 1 }) {
t.Fatalf("worker did not process first batch")
}
// Second batch must succeed because the slot was released.
if err := p.Submit(mk()); err != nil {
t.Fatalf("submit 2 after release: %v", err)
}
if !waitFor(t, 2*time.Second, func() bool { return p.Stats().Processed == 2 }) {
if !waitFor(t, 5*time.Second, func() bool { return p.Stats().Processed == 2 }) {
t.Fatalf("worker did not process second batch")
}
if got := p.TenantDropped(); got != 0 {
Expand Down Expand Up @@ -542,7 +540,7 @@ func TestPipeline_PanicInCallbackRecovered(t *testing.T) {
if err := p.Submit(good); err != nil {
t.Fatalf("submit good: %v", err)
}
if !waitFor(t, 2*time.Second, func() bool { return p.Stats().Processed >= 2 }) {
if !waitFor(t, 5*time.Second, func() bool { return p.Stats().Processed >= 2 }) {
t.Fatalf("worker did not survive callback panic β€” Processed=%d", p.Stats().Processed)
}
if p.Stats().ProcessFailures == 0 {
Expand Down
49 changes: 33 additions & 16 deletions internal/ui/ui.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package ui

import (
"embed"
"errors"
"fmt"
"io/fs"
"net/http"
Expand All @@ -13,6 +14,33 @@ import (
"github.com/RandomCodeSpace/otelcontext/internal/vectordb"
)

// spaFS wraps an fs.FS so http.FileServer transparently serves index.html
// for any extensionless path that doesn't resolve to a real file β€” the
// usual single-page-app routing where the React router owns client-side
// URLs. Asset-shaped paths (anything with a ".") still 404 normally so a
// missing /favicon.ico doesn't surprise the browser with an HTML body.
//
// Wrapping the FS β€” rather than calling Open() against r.URL.Path in our
// own handler β€” keeps the user-controlled name behind the stdlib
// http.FileServer boundary, where path.Clean has already happened.
type spaFS struct{ fs.FS }

func (s spaFS) Open(name string) (fs.File, error) {
f, err := s.FS.Open(name)
if err == nil {
return f, nil
}
if !errors.Is(err, fs.ErrNotExist) {
return nil, err
}
// SPA fallback only for extensionless paths (treated as client-side
// routes); legitimate asset 404s still propagate.
if strings.Contains(name, ".") {
return nil, err
}
return s.FS.Open("index.html")
}

//go:embed static/* dist
var content embed.FS

Expand Down Expand Up @@ -46,26 +74,15 @@ func (s *Server) SetMCPConfig(enabled bool, path string) {
func (s *Server) RegisterRoutes(mux *http.ServeMux) error {
mux.Handle("/static/", http.FileServer(http.FS(content)))

// Serve React SPA from dist/ for all non-API paths.
// API routes are registered before this is called, so they take priority.
// Serve React SPA from dist/ for all non-API paths. API routes are
// registered before this is called, so they take priority. spaFS
// converts extensionless 404s into index.html so the React router
// can claim them.
distFS, err := fs.Sub(content, "dist")
if err != nil {
return fmt.Errorf("ui: failed to create dist sub-fs: %w", err)
}
fileServer := http.FileServer(http.FS(distFS))
mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
// Try the file as-is; if not found, fall back to index.html (SPA routing).
f, openErr := distFS.Open(strings.TrimPrefix(r.URL.Path, "/"))
if openErr == nil {
_ = f.Close()
fileServer.ServeHTTP(w, r)
return
}
// SPA fallback β€” let the React router handle the path.
r2 := r.Clone(r.Context())
r2.URL.Path = "/"
fileServer.ServeHTTP(w, r2)
})
mux.Handle("/", http.FileServer(http.FS(spaFS{distFS})))

return nil
}
Loading