Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
98 changes: 98 additions & 0 deletions internal/handlers/logs_resourcelogs_twinlogs_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
package handlers_test

// logs_resourcelogs_twinlogs_test.go — covers the error/edge arms of
// LogsHandler.ResourceLogs (logs.go) that logs_coverage_test.go leaves open:
//
// logs.go:157-158 — lookup_failed: GetResourceByToken returns a non-NotFound
// error (driven with a closed DB).
// logs.go:194-196 — tail clamp: ?tail=0 (n<1) clamps up to 1.
// logs.go:206-211 — pods_unavailable: the pod List call returns an error
// (driven with a PrependReactor on the fake clientset).
//
// The clientset is the in-memory k8s fake (SetClientset seam), so these run
// under CI's postgres-only matrix without a live cluster.

import (
"errors"
"net/http"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
k8sfake "k8s.io/client-go/kubernetes/fake"
k8stesting "k8s.io/client-go/testing"

"instant.dev/internal/handlers"
"instant.dev/internal/testhelpers"
)

// TestLogs_LookupFailed_503 drives logs.go:157-158: a DB error (not a
// not-found) on GetResourceByToken returns 503 lookup_failed. We build the
// handler against a CLOSED *sql.DB so the query fails with a driver error that
// is NOT *models.ErrResourceNotFound.
func TestLogs_LookupFailed_503(t *testing.T) {
db, _ := testhelpers.SetupTestDB(t)
h := handlers.NewLogsHandler(db)
h.SetClientset(k8sfake.NewSimpleClientset())
// Close the DB now so GetResourceByToken's query returns a driver error
// (sql.ErrConnDone) — NOT a *models.ErrResourceNotFound — driving the
// lookup_failed 503 arm rather than the not_found 404 arm.
require.NoError(t, db.Close())

app := logsTestApp(t, db, h)
// A syntactically valid UUID so we pass the parse gate and reach the lookup.
resp := logsGet(t, app, "11111111-1111-1111-1111-111111111111", "")
defer resp.Body.Close()
assert.Equal(t, http.StatusServiceUnavailable, resp.StatusCode)
}

// TestLogs_TailClampLow_StreamsSSE drives logs.go:194-196: ?tail=0 (n<1) clamps
// up to 1 and the happy path still streams. Needs a pod in the fake clientset.
func TestLogs_TailClampLow_StreamsSSE(t *testing.T) {
db, clean := testhelpers.SetupTestDB(t)
defer clean()

const ns = "ns-clamp-low"
cs := k8sfake.NewSimpleClientset(&corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "postgres-0",
Namespace: ns,
Labels: map[string]string{"app": "postgres"},
},
})
h := handlers.NewLogsHandler(db)
h.SetClientset(cs)
app := logsTestApp(t, db, h)

token := seedLogsResource(t, db, "postgres", "growth", "active", ns)
resp := logsGet(t, app, token, "tail=0") // n<1 → clamp to 1
defer resp.Body.Close()
require.Equal(t, http.StatusOK, resp.StatusCode)
assert.Equal(t, "text/event-stream", resp.Header.Get("Content-Type"))
}

// TestLogs_ListPodsError_503 drives logs.go:206-211: the pod List call errors.
// A PrependReactor on the fake clientset makes List("pods") return an error so
// the pods_unavailable arm runs (distinct from the empty-list pod_not_found arm
// already covered).
func TestLogs_ListPodsError_503(t *testing.T) {
db, clean := testhelpers.SetupTestDB(t)
defer clean()

cs := k8sfake.NewSimpleClientset()
cs.PrependReactor("list", "pods",
func(action k8stesting.Action) (handled bool, ret runtime.Object, err error) {
return true, nil, errors.New("apiserver unreachable")
})
h := handlers.NewLogsHandler(db)
h.SetClientset(cs)
app := logsTestApp(t, db, h)

token := seedLogsResource(t, db, "postgres", "growth", "active", "ns-list-err")
resp := logsGet(t, app, token, "")
defer resp.Body.Close()
assert.Equal(t, http.StatusServiceUnavailable, resp.StatusCode)
}
93 changes: 93 additions & 0 deletions internal/handlers/logs_streamerr_twinlogs_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
package handlers_test

// logs_streamerr_twinlogs_test.go — covers the LAST uncovered arm of
// LogsHandler.ResourceLogs (logs.go:230-236): req.Stream(streamCtx) returns an
// error, so the handler logs stream_failed, cancels the background context, and
// returns 503 stream_failed.
//
// The vanilla k8s fake clientset's GetLogs always returns a request whose
// Stream succeeds with a canned "fake logs" body, so the error arm is
// unreachable through it. We wrap the fake in a thin kubernetes.Interface that
// delegates everything (so pod LIST still succeeds and we reach the GetLogs
// step) EXCEPT pod GetLogs, which we override to return a request backed by a
// rest/fake.RESTClient whose Err is set — making Stream(ctx) fail
// deterministically.

import (
"net/http"
"testing"

"github.com/stretchr/testify/assert"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/kubernetes"
k8sfake "k8s.io/client-go/kubernetes/fake"
"k8s.io/client-go/kubernetes/scheme"
typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1"
restclient "k8s.io/client-go/rest"
restfake "k8s.io/client-go/rest/fake"

"instant.dev/internal/handlers"
"instant.dev/internal/testhelpers"

"errors"
)

// streamErrClientset wraps a real fake clientset; only pod GetLogs is altered so
// its returned request fails on Stream.
type streamErrClientset struct {
kubernetes.Interface
}

func (c *streamErrClientset) CoreV1() typedcorev1.CoreV1Interface {
return &streamErrCoreV1{c.Interface.CoreV1()}
}

type streamErrCoreV1 struct {
typedcorev1.CoreV1Interface
}

func (c *streamErrCoreV1) Pods(namespace string) typedcorev1.PodInterface {
return &streamErrPods{c.CoreV1Interface.Pods(namespace)}
}

type streamErrPods struct {
typedcorev1.PodInterface
}

// GetLogs returns a request whose Stream(ctx) errors — backed by a
// rest/fake.RESTClient with Err set.
func (p *streamErrPods) GetLogs(name string, opts *corev1.PodLogOptions) *restclient.Request {
rc := &restfake.RESTClient{
NegotiatedSerializer: scheme.Codecs.WithoutConversion(),
GroupVersion: schema.GroupVersion{Version: "v1"},
Err: errors.New("log stream upstream unavailable"),
}
return rc.Request()
}

func TestLogs_StreamFailed_503(t *testing.T) {
db, clean := testhelpers.SetupTestDB(t)
defer clean()

const ns = "ns-stream-err"
// A matching pod so the LIST step succeeds and we reach GetLogs/Stream.
base := k8sfake.NewSimpleClientset(&corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "postgres-0",
Namespace: ns,
Labels: map[string]string{"app": "postgres"},
},
})
cs := &streamErrClientset{Interface: base}

h := handlers.NewLogsHandler(db)
h.SetClientset(cs)
app := logsTestApp(t, db, h)

token := seedLogsResource(t, db, "postgres", "growth", "active", ns)
resp := logsGet(t, app, token, "")
defer resp.Body.Close()
assert.Equal(t, http.StatusServiceUnavailable, resp.StatusCode)
}
85 changes: 85 additions & 0 deletions internal/handlers/sse_logs_writeerr_twinlogs_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
package handlers

// sse_logs_writeerr_twinlogs_test.go — covers the two write-error early-return
// arms of streamLogsSSE (sse_logs.go) that the existing sse_logs_test.go leaves
// open because its failingWriter only ever surfaces the error at the *Flush*
// call (line 64), never at the WriteString call itself:
//
// sse_logs.go:61-63 — WriteString of a data line returns an error → return.
// sse_logs.go:72-74 — WriteString of the end marker returns an error → return.
//
// A bufio.Writer's WriteString only returns an error when an internal flush
// (forced when its buffer fills) hits the underlying writer's error. The
// existing tests use the default 4 KiB buffer, so the small SSE lines never
// force a mid-WriteString flush — the error always lands on the explicit
// w.Flush() instead. Wrapping an immediately-failing writer in a size-1 bufio
// buffer forces the flush to happen *inside* WriteString, surfacing the error
// at lines 61 and 72.

import (
"bufio"
"strings"
"testing"
)

// alwaysFailWriter fails on the very first Write — modelling a fasthttp client
// that disconnected before any byte landed.
type alwaysFailWriter struct{ writes int }

func (a *alwaysFailWriter) Write(p []byte) (int, error) {
a.writes++
return 0, errWriteClosed
}

// errWriteClosed is a sentinel write error (kept as a package-level var so the
// closure above stays allocation-free and the intent is named).
var errWriteClosed = &writeClosedError{}

type writeClosedError struct{}

func (*writeClosedError) Error() string { return "writer closed" }

// TestStreamLogsSSE_DataWriteStringError_BreaksPump drives sse_logs.go:61-63:
// the WriteString of a data line returns an error (not just the later Flush),
// so the pump returns immediately and the deferred Close + cancel still run.
func TestStreamLogsSSE_DataWriteStringError_BreaksPump(t *testing.T) {
stream := &trackedStream{Reader: strings.NewReader("a line that exceeds one byte\nsecond line\n")}
// size-1 buffer → the first WriteString forces an internal flush mid-write,
// surfacing the underlying writer error from WriteString itself (line 61),
// not from the explicit Flush (line 64).
fw := &alwaysFailWriter{}
w := bufio.NewWriterSize(fw, 1)

cancelled := false
streamLogsSSE(w, stream, func() { cancelled = true })

if stream.closes != 1 {
t.Errorf("stream Close called %d times after WriteString error, want 1", stream.closes)
}
if !cancelled {
t.Error("cancel not invoked after data-line WriteString error")
}
}

// TestStreamLogsSSE_EndMarkerWriteStringError drives sse_logs.go:72-74: an empty
// stream writes no data lines, then the end-marker WriteString hits the failing
// underlying writer (via the size-1 buffer flush) and returns — exercising the
// end-marker write-error branch. Teardown (Close + cancel) still runs via defer.
func TestStreamLogsSSE_EndMarkerWriteStringError(t *testing.T) {
stream := &trackedStream{Reader: strings.NewReader("")} // no data lines
fw := &alwaysFailWriter{}
w := bufio.NewWriterSize(fw, 1)

cancelled := false
streamLogsSSE(w, stream, func() { cancelled = true })

if fw.writes == 0 {
t.Error("end-marker WriteString did not reach the underlying writer")
}
if stream.closes != 1 {
t.Errorf("stream Close called %d times after end-marker write error, want 1", stream.closes)
}
if !cancelled {
t.Error("cancel not invoked after end-marker WriteString error")
}
}
112 changes: 112 additions & 0 deletions internal/handlers/twin_storage_exceeded_twinlogs_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
package handlers_test

// twin_storage_exceeded_twinlogs_test.go — covers the `if res.StorageExceeded`
// warning arm of the three twin renderers:
//
// db.go:574 DBHandler.ProvisionForTwin
// cache.go:502 CacheHandler.ProvisionForTwin
// nosql.go:507 NoSQLHandler.ProvisionForTwin
//
// That arm sets resp["warning"] + the X-Instant-Notice header, and is reachable
// only when ProvisionForTwinCore returns StorageExceeded=true — a state that
// requires a freshly-twinned resource to already exceed its tier's storage cap.
// The checkStorageQuota seam (seams.go, driven by forceStorageExceeded in
// storage_exceeded_seam2_test.go) forces exceeded=true at exactly the Core gate,
// so the renderer takes the warning arm and surfaces it on the 201 response.
//
// Backend is the bufconn-backed fakeProvisioner from
// coverage_provisioner_grpc_test.go (setupGRPCProvFixture), so the twin pipeline
// reaches a real 201 (not a 503) under CI's postgres-only matrix — unlike the
// live-backend seam2 anon/auth tests which skip when the customer backend is
// unreachable.

import (
"encoding/json"
"io"
"net/http"
"net/http/httptest"
"strings"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"instant.dev/internal/testhelpers"
)

// postTwinDevRaw POSTs a single twin to the development env (which bypasses the
// approval gate) and returns the raw response plus the decoded warning field, so
// a test can assert both the X-Instant-Notice header and the warning JSON the
// StorageExceeded arm sets.
func postTwinDevRaw(t *testing.T, fx grpcProvFixture, sourceToken, jwt string) (*http.Response, string) {
t.Helper()
b, _ := json.Marshal(map[string]any{"env": "development"})
req := httptest.NewRequest(http.MethodPost,
"/api/v1/resources/"+sourceToken+"/provision-twin", strings.NewReader(string(b)))
req.Header.Set("Content-Type", "application/json")
req.Header.Set("Authorization", "Bearer "+jwt)
resp, err := fx.app.Test(req, 15000)
require.NoError(t, err)
raw, _ := io.ReadAll(resp.Body)
var parsed map[string]any
_ = json.Unmarshal(raw, &parsed)
warning, _ := parsed["warning"].(string)
return resp, warning
}

// assertTwinWarningArm asserts the twin renderer surfaced the storage-limit
// warning that the StorageExceeded arm sets (both the JSON field and the notice
// header).
func assertTwinWarningArm(t *testing.T, resp *http.Response, warning string) {
t.Helper()
require.Equal(t, http.StatusCreated, resp.StatusCode)
assert.Equal(t, "storage_limit_reached", resp.Header.Get("X-Instant-Notice"),
"StorageExceeded twin arm must stamp the X-Instant-Notice header")
assert.Contains(t, warning, "Storage limit reached",
"StorageExceeded twin arm must surface the warning field")
}

func TestTwin_DB_StorageExceeded_WarningArm(t *testing.T) {
restore := forceStorageExceeded(t)
defer restore()

fake := &fakeProvisioner{}
fx := setupGRPCProvFixture(t, fake, false)
teamID := testhelpers.MustCreateTeamDB(t, fx.db, "pro")
jwt := authSessionJWT(t, fx.db, teamID)
_, srcToken := seedSourceResource(t, fx.db, teamID, "postgres", "pro", "production")

resp, warning := postTwinDevRaw(t, fx, srcToken, jwt)
defer resp.Body.Close()
assertTwinWarningArm(t, resp, warning)
}

func TestTwin_Cache_StorageExceeded_WarningArm(t *testing.T) {
restore := forceStorageExceeded(t)
defer restore()

fake := &fakeProvisioner{}
fx := setupGRPCProvFixture(t, fake, false)
teamID := testhelpers.MustCreateTeamDB(t, fx.db, "pro")
jwt := authSessionJWT(t, fx.db, teamID)
_, srcToken := seedSourceResource(t, fx.db, teamID, "redis", "pro", "production")

resp, warning := postTwinDevRaw(t, fx, srcToken, jwt)
defer resp.Body.Close()
assertTwinWarningArm(t, resp, warning)
}

func TestTwin_NoSQL_StorageExceeded_WarningArm(t *testing.T) {
restore := forceStorageExceeded(t)
defer restore()

fake := &fakeProvisioner{}
fx := setupGRPCProvFixture(t, fake, false)
teamID := testhelpers.MustCreateTeamDB(t, fx.db, "pro")
jwt := authSessionJWT(t, fx.db, teamID)
_, srcToken := seedSourceResource(t, fx.db, teamID, "mongodb", "pro", "production")

resp, warning := postTwinDevRaw(t, fx, srcToken, jwt)
defer resp.Body.Close()
assertTwinWarningArm(t, resp, warning)
}
Loading