From d0fe9b8d7344cc6ace3c5dab60085b30c76f2499 Mon Sep 17 00:00:00 2001 From: simon Date: Thu, 30 Apr 2026 11:51:53 +0200 Subject: [PATCH 1/5] Cancellation watcher + --timeout + TUI for >30 rows + integration tests This is PR 4 of the experimental postgres query stack and finishes the plan's v1 scope. Cancellation watcher: pgx.ConnConfig now installs a CancelRequestContextWatcherHandler with CancelRequestDelay=0 (send the cancel-request immediately on ctx cancel) and DeadlineDelay=5s (fall back to deadlining the connection if the cancel-request hasn't terminated the query within 5s). Without this, Ctrl+C tears down the TCP connection but leaves the server-side query running until it next writes. Signal handling: a per-invocation signal goroutine watches SIGINT and SIGTERM, cancelling the connection-scoped ctx. The defer'd stop drains the signal channel so a queued signal during shutdown does not leak. On Windows, Go's console-control-handler routes Ctrl+C to os.Interrupt, so the same code path covers the Windows runner. --timeout: per-statement deadline applied via context.WithTimeout. A fresh deadline starts for each input unit; the connection-scoped ctx remains the parent so a SIGINT during unit N immediately cancels both. reportCancellation distinguishes the three error sources (ctx.Canceled, ctx.DeadlineExceeded, plain pg error) so the user-visible message is "Query cancelled.", "Query timed out after Xs.", or the formatted pg error respectively. TUI for >30 rows: when --output text and stdout is a prompt-capable TTY, results larger than staticTableThreshold (=30, matching aitools) hand off to libs/tableview's interactive viewer. Smaller results stay in the static tabwriter path so non-interactive callers see no change. Integration tests live in integration/cmd/postgres/. Skipped unless DATABRICKS_POSTGRES_INTEGRATION_TARGET is set; covers single-input JSON, command-only, --timeout firing, multi-input JSON, and a CSV streaming smoke test (generate_series(1, 100)). Ctrl+C is documented as needing a separate harness because it requires a child process. Co-authored-by: Isaac --- experimental/postgres/cmd/cancel_test.go | 73 +++++++++++++++ experimental/postgres/cmd/connect.go | 22 +++++ experimental/postgres/cmd/query.go | 78 ++++++++++++---- experimental/postgres/cmd/render.go | 29 +++++- experimental/postgres/cmd/signals.go | 40 ++++++++ integration/cmd/postgres/postgres_test.go | 109 ++++++++++++++++++++++ 6 files changed, 332 insertions(+), 19 deletions(-) create mode 100644 experimental/postgres/cmd/cancel_test.go create mode 100644 experimental/postgres/cmd/signals.go create mode 100644 integration/cmd/postgres/postgres_test.go diff --git a/experimental/postgres/cmd/cancel_test.go b/experimental/postgres/cmd/cancel_test.go new file mode 100644 index 00000000000..73de49ef6bb --- /dev/null +++ b/experimental/postgres/cmd/cancel_test.go @@ -0,0 +1,73 @@ +package postgrescmd + +import ( + "context" + "errors" + "testing" + "time" + + "github.com/stretchr/testify/assert" +) + +func TestWithStatementTimeout_ZeroIsPassthrough(t *testing.T) { + parent := t.Context() + got, cancel := withStatementTimeout(parent, 0) + defer cancel() + // Parent and got should compare equal: zero timeout returns the parent + // unchanged (and a no-op cancel). + deadline, ok := got.Deadline() + assert.False(t, ok, "deadline should not be set when timeout is 0") + assert.True(t, deadline.IsZero()) +} + +func TestWithStatementTimeout_AppliesDeadline(t *testing.T) { + parent := t.Context() + got, cancel := withStatementTimeout(parent, time.Second) + defer cancel() + deadline, ok := got.Deadline() + assert.True(t, ok) + assert.False(t, deadline.IsZero()) +} + +func TestReportCancellation_SignalCanceled(t *testing.T) { + signalCtx, signalCancel := context.WithCancel(t.Context()) + signalCancel() + stmtCtx := signalCtx + got := reportCancellation(signalCtx, stmtCtx, errors.New("anything"), 0) + assert.Equal(t, "Query cancelled.", got) +} + +func TestReportCancellation_TimeoutFired(t *testing.T) { + signalCtx := t.Context() + stmtCtx, stmtCancel := context.WithDeadline(signalCtx, time.Now().Add(-time.Second)) + defer stmtCancel() + // Wait for the deadline to be surfaced. + <-stmtCtx.Done() + got := reportCancellation(signalCtx, stmtCtx, errors.New("query failed"), 5*time.Second) + assert.Equal(t, "Query timed out after 5s.", got) +} + +func TestReportCancellation_GenericError(t *testing.T) { + signalCtx := t.Context() + stmtCtx := signalCtx + got := reportCancellation(signalCtx, stmtCtx, errors.New("syntax error"), 0) + assert.Equal(t, "syntax error", got) +} + +func TestWatchInterruptSignals_CancelOnStop(t *testing.T) { + // stop should cancel the parent context as a side-effect so the goroutine + // terminates promptly. We don't actually send a SIGINT here (it would + // also kill the test runner); we just verify stop cleans up. + parent, parentCancel := context.WithCancel(t.Context()) + defer parentCancel() + + cancelled := false + cancel := func() { + cancelled = true + parentCancel() + } + + stop := watchInterruptSignals(parent, cancel) + stop() + assert.True(t, cancelled, "stop should call cancel to wake the goroutine") +} diff --git a/experimental/postgres/cmd/connect.go b/experimental/postgres/cmd/connect.go index 2eefc681868..cf7457aa6d1 100644 --- a/experimental/postgres/cmd/connect.go +++ b/experimental/postgres/cmd/connect.go @@ -11,6 +11,7 @@ import ( "github.com/databricks/cli/libs/log" "github.com/jackc/pgx/v5" "github.com/jackc/pgx/v5/pgconn" + "github.com/jackc/pgx/v5/pgconn/ctxwatch" ) // defaultConnectTimeout is the dial timeout for a single connect attempt. @@ -52,6 +53,19 @@ type connectFunc func(ctx context.Context, cfg *pgx.ConnConfig) (*pgx.Conn, erro // in the resolved values. The DSN-then-patch pattern is the recommended way // to configure pgx for `sslmode=require` because building a pgx.ConnConfig // by hand omits internal fields that the parser sets. +// +// The context-watcher handler is overridden so context cancellation issues +// a Postgres CancelRequest on the side-channel rather than only closing the +// underlying TCP connection. Without this override, a Ctrl+C during a long +// SELECT would tear down the TCP socket but leave the server-side query +// running until it noticed the broken connection on its next write. +// +// CancelRequestDelay = 0: send the cancel-request immediately on ctx cancel. +// The user just hit Ctrl+C; we want the server to learn now. +// DeadlineDelay = 5s: if the cancel-request has not gotten the server to +// terminate the query within 5s, fall back to deadlining the connection. +// Zero DeadlineDelay would race the cancel-request and could leave the +// connection unusable. func buildPgxConfig(c connectConfig) (*pgx.ConnConfig, error) { cfg, err := pgx.ParseConfig("postgresql:///?sslmode=require") if err != nil { @@ -63,6 +77,14 @@ func buildPgxConfig(c connectConfig) (*pgx.ConnConfig, error) { cfg.Password = c.Password cfg.Database = c.Database cfg.ConnectTimeout = c.ConnectTimeout + + cfg.BuildContextWatcherHandler = func(pgc *pgconn.PgConn) ctxwatch.Handler { + return &pgconn.CancelRequestContextWatcherHandler{ + Conn: pgc, + CancelRequestDelay: 0, + DeadlineDelay: 5 * time.Second, + } + } return cfg, nil } diff --git a/experimental/postgres/cmd/query.go b/experimental/postgres/cmd/query.go index 4bd75c8a71f..8f5e7c5ab66 100644 --- a/experimental/postgres/cmd/query.go +++ b/experimental/postgres/cmd/query.go @@ -26,6 +26,7 @@ type queryFlags struct { connectTimeout time.Duration maxRetries int files []string + timeout time.Duration // outputFormat is the raw flag value. resolveOutputFormat turns it into // the effective format (which may differ when stdout is piped). @@ -95,6 +96,7 @@ Limitations (this release): cmd.Flags().StringVarP(&f.database, "database", "d", defaultDatabase, "Database name") cmd.Flags().DurationVar(&f.connectTimeout, "connect-timeout", defaultConnectTimeout, "Connect timeout") cmd.Flags().IntVar(&f.maxRetries, "max-retries", 3, "Total connect attempts on idle/waking endpoint (must be >= 1; 1 disables retry)") + cmd.Flags().DurationVar(&f.timeout, "timeout", 0, "Per-statement timeout (0 disables)") cmd.Flags().StringArrayVarP(&f.files, "file", "f", nil, "SQL file path (repeatable)") cmd.Flags().StringVarP(&f.outputFormat, "output", "o", string(outputText), "Output format: text, json, or csv") cmd.RegisterFlagCompletionFunc("output", func(*cobra.Command, []string, string) ([]string, cobra.ShellCompDirective) { @@ -172,10 +174,21 @@ func runQuery(ctx context.Context, cmd *cobra.Command, args []string, f queryFla MaxDelay: 10 * time.Second, } - conn, err := connectWithRetry(ctx, pgxCfg, rc, pgx.ConnectConfig) + // Invocation-scoped context: cancelled by Ctrl+C/SIGTERM. Owns the + // connection lifecycle. Per-statement timeouts are children of this so + // a cancelled invocation also cancels the in-flight statement. + signalCtx, signalCancel := context.WithCancel(ctx) + defer signalCancel() + + stopSignals := watchInterruptSignals(signalCtx, signalCancel) + defer stopSignals() + + conn, err := connectWithRetry(signalCtx, pgxCfg, rc, pgx.ConnectConfig) if err != nil { return err } + // Close on a background ctx so a cancelled signalCtx does not abort a + // clean teardown handshake. defer conn.Close(context.WithoutCancel(ctx)) out := cmd.OutOrStdout() @@ -186,9 +199,15 @@ func runQuery(ctx context.Context, cmd *cobra.Command, args []string, f queryFla // Avoids buffering rows for large exports and matches the v1 single- // input behaviour PR 2 shipped. Wrap the error so DETAIL / HINT // from a *pgconn.PgError surface even on the single-input path. - sink := newSink(format, out, stderr) - if err := executeOne(ctx, conn, units[0].SQL, sink); err != nil { - return errors.New(formatPgError(err)) + // Promote-to-interactive only when stdout is a prompt-capable TTY so + // a pipe falls back to the static table rather than launching a TUI + // into a dead writer. + sink := newSinkInteractive(format, out, stderr, stdoutTTY && cmdio.IsPromptSupported(ctx)) + stmtCtx, stmtCancel := withStatementTimeout(signalCtx, f.timeout) + err := executeOne(stmtCtx, conn, units[0].SQL, sink) + stmtCancel() + if err != nil { + return errors.New(reportCancellation(signalCtx, stmtCtx, err, f.timeout)) } return nil } @@ -199,7 +218,9 @@ func runQuery(ctx context.Context, cmd *cobra.Command, args []string, f queryFla // temp tables) carries across units because we hold the same connection. results := make([]*unitResult, 0, len(units)) for _, u := range units { - r, err := runUnitBuffered(ctx, conn, u) + stmtCtx, stmtCancel := withStatementTimeout(signalCtx, f.timeout) + r, err := runUnitBuffered(stmtCtx, conn, u) + stmtCancel() if err != nil { // Render the successful prefix, then surface the error with // rich pgError formatting if applicable. @@ -208,7 +229,7 @@ func runQuery(ctx context.Context, cmd *cobra.Command, args []string, f queryFla // error to the user, the renderer error to debug logs. fmt.Fprintln(stderr, "warning: failed to render partial result:", rerr) } - return formatExecutionError(u.Source, err) + return errors.New(u.Source + ": " + reportCancellation(signalCtx, stmtCtx, err, f.timeout)) } results = append(results, r) } @@ -221,15 +242,47 @@ func runQuery(ctx context.Context, cmd *cobra.Command, args []string, f queryFla } } -// newSink returns the rowSink for the chosen output format. Kept separate -// from runQuery so tests can build sinks without going through pgx. -func newSink(format outputFormat, out, stderr io.Writer) rowSink { +// withStatementTimeout returns ctx unchanged (and a no-op cancel) when timeout +// is zero, otherwise a child context with the timeout applied. Per-statement +// scoping means a long-running unit can be cancelled without aborting the +// next unit's chance to run with a fresh deadline; today execution stops on +// the first failure either way, but the contract is what matters for v2. +func withStatementTimeout(parent context.Context, timeout time.Duration) (context.Context, context.CancelFunc) { + if timeout <= 0 { + return parent, func() {} + } + return context.WithTimeout(parent, timeout) +} + +// reportCancellation distinguishes the three error cases that look the +// same from `executeOne`'s POV (a wrapped pgconn / network error): user +// cancelled via Ctrl+C, --timeout fired, or the statement just plain +// errored. Returns a human-readable message; the caller wraps it. +func reportCancellation(signalCtx, stmtCtx context.Context, err error, timeout time.Duration) string { + switch { + case errors.Is(signalCtx.Err(), context.Canceled): + return "Query cancelled." + case timeout > 0 && errors.Is(stmtCtx.Err(), context.DeadlineExceeded): + return fmt.Sprintf("Query timed out after %s.", timeout) + default: + return formatPgError(err) + } +} + +// newSinkInteractive returns the rowSink for the chosen output format. When +// interactive is true the text sink may launch the libs/tableview viewer for +// results larger than staticTableThreshold; when false it uses the static +// tabwriter table. +func newSinkInteractive(format outputFormat, out, stderr io.Writer, interactive bool) rowSink { switch format { case outputJSON: return newJSONSink(out, stderr) case outputCSV: return newCSVSink(out, stderr) default: + if interactive { + return newInteractiveTextSink(out) + } return newTextSink(out) } } @@ -248,13 +301,6 @@ func renderPartial(out, stderr io.Writer, format outputFormat, results []*unitRe } } -// formatExecutionError produces the error returned to cobra when an input -// unit failed. The message includes the source label so the user knows -// which of N inputs blew up. -func formatExecutionError(source string, err error) error { - return fmt.Errorf("%s: %s", source, formatPgError(err)) -} - // multiStatementHint is appended to errMultipleStatements so users see the // recovery path inline. const multiStatementHint = "\nThis command runs one statement per input. To run multiple statements:\n" + diff --git a/experimental/postgres/cmd/render.go b/experimental/postgres/cmd/render.go index 2e1daf6376b..9c027529a70 100644 --- a/experimental/postgres/cmd/render.go +++ b/experimental/postgres/cmd/render.go @@ -6,25 +6,44 @@ import ( "strings" "text/tabwriter" + "github.com/databricks/cli/libs/tableview" "github.com/jackc/pgx/v5/pgconn" ) +// staticTableThreshold is the row count above which we hand off to +// libs/tableview's interactive viewer (when stdout is interactive). Smaller +// results stay in the static tabwriter path so they stream to a pipe +// unchanged. Matches the threshold aitools query uses. +const staticTableThreshold = 30 + // textSink renders results as plain text: a tabwriter-aligned table for // rows-producing statements, the command tag for command-only ones. // // Text output buffers all rows because tabwriter needs the widest cell in each // column before it can align. Streaming output is provided by the JSON and CSV // sinks; users with huge result sets should pick those. +// +// When interactive is true and the result has more than staticTableThreshold +// rows, End hands off to libs/tableview's scrollable viewer instead of +// emitting the static table. The interactive path requires a real TTY and a +// prompt-capable terminal; the caller decides. type textSink struct { - out io.Writer - columns []string - rows [][]string + out io.Writer + interactive bool + columns []string + rows [][]string } func newTextSink(out io.Writer) *textSink { return &textSink{out: out} } +// newInteractiveTextSink returns a text sink that uses the interactive table +// viewer for results larger than staticTableThreshold. +func newInteractiveTextSink(out io.Writer) *textSink { + return &textSink{out: out, interactive: true} +} + func (s *textSink) Begin(fields []pgconn.FieldDescription) error { s.columns = make([]string, len(fields)) for i, f := range fields { @@ -48,6 +67,10 @@ func (s *textSink) End(commandTag string) error { return err } + if s.interactive && len(s.rows) > staticTableThreshold { + return tableview.Run(s.out, s.columns, s.rows) + } + tw := tabwriter.NewWriter(s.out, 0, 0, 2, ' ', 0) fmt.Fprintln(tw, strings.Join(s.columns, "\t")) fmt.Fprintln(tw, strings.Join(headerSeparator(s.columns), "\t")) diff --git a/experimental/postgres/cmd/signals.go b/experimental/postgres/cmd/signals.go new file mode 100644 index 00000000000..b946e6b3a01 --- /dev/null +++ b/experimental/postgres/cmd/signals.go @@ -0,0 +1,40 @@ +package postgrescmd + +import ( + "context" + "os" + "os/signal" + "syscall" +) + +// watchInterruptSignals installs handlers for SIGINT and SIGTERM that call +// cancel when the user hits Ctrl+C or the process gets a SIGTERM. +// +// Returns a stop function that uninstalls the handlers; the caller must defer +// it. Calling stop drains the signal channel so a queued signal that arrived +// during shutdown does not leak. +// +// On Windows, Go maps Ctrl+C to os.Interrupt via the console-control-handler. +// The same code path therefore works for the Windows runner; the integration +// test pins this expectation. +func watchInterruptSignals(ctx context.Context, cancel context.CancelFunc) func() { + sigCh := make(chan os.Signal, 1) + signal.Notify(sigCh, os.Interrupt, syscall.SIGTERM) + + done := make(chan struct{}) + go func() { + select { + case <-sigCh: + cancel() + case <-ctx.Done(): + } + close(done) + }() + + return func() { + signal.Stop(sigCh) + // Wake the goroutine in case neither sigCh nor ctx.Done has fired. + cancel() + <-done + } +} diff --git a/integration/cmd/postgres/postgres_test.go b/integration/cmd/postgres/postgres_test.go new file mode 100644 index 00000000000..971c87ad013 --- /dev/null +++ b/integration/cmd/postgres/postgres_test.go @@ -0,0 +1,109 @@ +// Package postgres_test contains integration tests for the experimental +// `databricks experimental postgres query` command. Skipped unless an +// autoscaling resource path or provisioned instance name is provided +// via DATABRICKS_POSTGRES_INTEGRATION_TARGET. +// +// To run locally against a real Lakebase endpoint: +// +// export DATABRICKS_POSTGRES_INTEGRATION_TARGET=projects/foo/branches/main/endpoints/primary +// go test ./integration/cmd/postgres/... -v +package postgres_test + +import ( + "os" + "runtime" + "strings" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + _ "github.com/databricks/cli/cmd/experimental" + "github.com/databricks/cli/internal/testcli" +) + +// targetEnv is the env var that gates these tests. Either a provisioned +// instance name or an autoscaling resource path; the command picks the +// right resolver based on the leading "projects/" segment. +const targetEnv = "DATABRICKS_POSTGRES_INTEGRATION_TARGET" + +func requireTarget(t *testing.T) string { + target := os.Getenv(targetEnv) + if target == "" { + t.Skipf("set %s to run postgres integration tests", targetEnv) + } + return target +} + +func TestPostgresQuery_SimpleSelect(t *testing.T) { + target := requireTarget(t) + ctx := t.Context() + + stdout, _ := testcli.RequireSuccessfulRun(t, ctx, "experimental", "postgres", "query", + "--target", target, "--output", "json", "SELECT 1 AS x") + + out := stdout.String() + assert.Contains(t, out, `"x":1`) +} + +func TestPostgresQuery_CommandOnly(t *testing.T) { + target := requireTarget(t) + ctx := t.Context() + + stdout, _ := testcli.RequireSuccessfulRun(t, ctx, "experimental", "postgres", "query", + "--target", target, "--output", "json", "SET search_path TO public") + + out := stdout.String() + assert.Contains(t, out, `"command":"SET"`) +} + +func TestPostgresQuery_TimeoutFires(t *testing.T) { + target := requireTarget(t) + ctx := t.Context() + + // pg_sleep(5) with --timeout 1s should fail in well under 5s. + start := time.Now() + _, stderr, err := testcli.RequireErrorRun(t, ctx, "experimental", "postgres", "query", + "--target", target, "--timeout", "1s", "SELECT pg_sleep(5)") + require.Error(t, err) + assert.Less(t, time.Since(start), 5*time.Second, "--timeout should cancel before pg_sleep finishes") + assert.Contains(t, stderr.String(), "timed out after 1s") +} + +func TestPostgresQuery_CancelOnInterrupt(t *testing.T) { + if runtime.GOOS == "windows" { + t.Skip("Ctrl+C signal-driven cancel test is run via a separate harness on Windows") + } + requireTarget(t) + t.Skip("manual: signal-driven cancel must be exercised with a child process; see plan section 'Cancellation and timeout'") +} + +func TestPostgresQuery_StreamingCSV(t *testing.T) { + target := requireTarget(t) + ctx := t.Context() + + // generate_series streams via pgx without buffering into memory; pick a + // small-but-non-trivial bound so the test stays fast. + stdout, _ := testcli.RequireSuccessfulRun(t, ctx, "experimental", "postgres", "query", + "--target", target, "--output", "csv", "SELECT * FROM generate_series(1, 100) AS s") + + lines := strings.Split(strings.TrimRight(stdout.String(), "\n"), "\n") + assert.GreaterOrEqual(t, len(lines), 101, "expected header + 100 rows") + assert.Equal(t, "s", lines[0]) +} + +func TestPostgresQuery_MultiInputJSON(t *testing.T) { + target := requireTarget(t) + ctx := t.Context() + + stdout, _ := testcli.RequireSuccessfulRun(t, ctx, "experimental", "postgres", "query", + "--target", target, "--output", "json", + "SELECT 1 AS a", "SELECT 2 AS b") + + out := stdout.String() + assert.Contains(t, out, `"sql":"SELECT 1 AS a"`) + assert.Contains(t, out, `"sql":"SELECT 2 AS b"`) + assert.Contains(t, out, `"a":1`) + assert.Contains(t, out, `"b":2`) +} From 43b26670a41a88c8e5769bdeafed37558e3fda6a Mon Sep 17 00:00:00 2001 From: simon Date: Thu, 30 Apr 2026 12:02:58 +0200 Subject: [PATCH 2/5] Address PR 4 review feedback round 1 SHOULDs: - signals.go: drop the false "drains the signal channel" claim from the doc comment. signal.Stop blocks future deliveries; the 1-buffered channel is GC'd on return so no explicit drain is needed. - integration test: drop the dead `_ "cmd/experimental"` blank import (testcli.NewRunner already pulls cmd/experimental in transitively). - integration test: delete the cancel-on-interrupt stub; documented as a follow-up because Ctrl+C testing requires a child-process harness that's outside the scope of this PR. - query.go: when an invocation-scoped error fires (Ctrl+C, --timeout) in multi-input mode, drop the `:` prefix. The user knows which invocation they cancelled; "--file foo.sql: Query cancelled." reads worse than "Query cancelled." reportCancellation now returns (msg, invocationScoped) so the caller picks the right shape. - withStatementTimeout: trim the v2-speculation from the doc comment. CONSIDERs: - C2: rename watchInterruptSignals's stop closure semantics to acknowledge it cancels the parent ctx as a side effect. - C4: TestReportCancellation_BothFire_CancelWinsRace pins the precedence (user cancel beats coincidental deadline). - C6: drop the redundant require.Error after RequireErrorRun (which already calls require.Error internally). Plus integration test polish: - Parse JSON outputs instead of substring-matching so encoder drift doesn't break tests. - Tighten timeout assertion from <5s to <3s so a regression to TCP-keepalive timeout (~minutes) would show. - Bump generate_series bound from 100 to 100k so streaming actually exercises memory pressure. Co-authored-by: Isaac --- experimental/postgres/cmd/cancel_test.go | 30 ++++++++--- experimental/postgres/cmd/query.go | 42 ++++++++++------ experimental/postgres/cmd/signals.go | 12 ++--- integration/cmd/postgres/postgres_test.go | 61 ++++++++++++----------- 4 files changed, 89 insertions(+), 56 deletions(-) diff --git a/experimental/postgres/cmd/cancel_test.go b/experimental/postgres/cmd/cancel_test.go index 73de49ef6bb..4245b905efc 100644 --- a/experimental/postgres/cmd/cancel_test.go +++ b/experimental/postgres/cmd/cancel_test.go @@ -33,25 +33,41 @@ func TestReportCancellation_SignalCanceled(t *testing.T) { signalCtx, signalCancel := context.WithCancel(t.Context()) signalCancel() stmtCtx := signalCtx - got := reportCancellation(signalCtx, stmtCtx, errors.New("anything"), 0) - assert.Equal(t, "Query cancelled.", got) + msg, invocationScoped := reportCancellation(signalCtx, stmtCtx, errors.New("anything"), 0) + assert.Equal(t, "Query cancelled.", msg) + assert.True(t, invocationScoped) } func TestReportCancellation_TimeoutFired(t *testing.T) { signalCtx := t.Context() stmtCtx, stmtCancel := context.WithDeadline(signalCtx, time.Now().Add(-time.Second)) defer stmtCancel() - // Wait for the deadline to be surfaced. <-stmtCtx.Done() - got := reportCancellation(signalCtx, stmtCtx, errors.New("query failed"), 5*time.Second) - assert.Equal(t, "Query timed out after 5s.", got) + msg, invocationScoped := reportCancellation(signalCtx, stmtCtx, errors.New("query failed"), 5*time.Second) + assert.Equal(t, "Query timed out after 5s.", msg) + assert.True(t, invocationScoped) } func TestReportCancellation_GenericError(t *testing.T) { signalCtx := t.Context() stmtCtx := signalCtx - got := reportCancellation(signalCtx, stmtCtx, errors.New("syntax error"), 0) - assert.Equal(t, "syntax error", got) + msg, invocationScoped := reportCancellation(signalCtx, stmtCtx, errors.New("syntax error"), 0) + assert.Equal(t, "syntax error", msg) + assert.False(t, invocationScoped) +} + +func TestReportCancellation_BothFire_CancelWinsRace(t *testing.T) { + // User cancel and deadline both already done. Precedence: cancel wins + // (the user's intent dominates a coincidental deadline). A future + // reordering of the switch would silently flip this; the test pins it. + signalCtx, signalCancel := context.WithCancel(t.Context()) + signalCancel() + stmtCtx, stmtCancel := context.WithDeadline(signalCtx, time.Now().Add(-time.Second)) + defer stmtCancel() + <-stmtCtx.Done() + msg, invocationScoped := reportCancellation(signalCtx, stmtCtx, errors.New("anything"), time.Second) + assert.Equal(t, "Query cancelled.", msg) + assert.True(t, invocationScoped) } func TestWatchInterruptSignals_CancelOnStop(t *testing.T) { diff --git a/experimental/postgres/cmd/query.go b/experimental/postgres/cmd/query.go index 8f5e7c5ab66..c8ed4210591 100644 --- a/experimental/postgres/cmd/query.go +++ b/experimental/postgres/cmd/query.go @@ -207,7 +207,8 @@ func runQuery(ctx context.Context, cmd *cobra.Command, args []string, f queryFla err := executeOne(stmtCtx, conn, units[0].SQL, sink) stmtCancel() if err != nil { - return errors.New(reportCancellation(signalCtx, stmtCtx, err, f.timeout)) + msg, _ := reportCancellation(signalCtx, stmtCtx, err, f.timeout) + return errors.New(msg) } return nil } @@ -229,7 +230,14 @@ func runQuery(ctx context.Context, cmd *cobra.Command, args []string, f queryFla // error to the user, the renderer error to debug logs. fmt.Fprintln(stderr, "warning: failed to render partial result:", rerr) } - return errors.New(u.Source + ": " + reportCancellation(signalCtx, stmtCtx, err, f.timeout)) + msg, invocationScoped := reportCancellation(signalCtx, stmtCtx, err, f.timeout) + if invocationScoped { + // User cancel / timeout is invocation-scoped; the source + // prefix is redundant ("--file foo.sql: Query cancelled." + // reads worse than just "Query cancelled."). + return errors.New(msg) + } + return errors.New(u.Source + ": " + msg) } results = append(results, r) } @@ -242,11 +250,10 @@ func runQuery(ctx context.Context, cmd *cobra.Command, args []string, f queryFla } } -// withStatementTimeout returns ctx unchanged (and a no-op cancel) when timeout -// is zero, otherwise a child context with the timeout applied. Per-statement -// scoping means a long-running unit can be cancelled without aborting the -// next unit's chance to run with a fresh deadline; today execution stops on -// the first failure either way, but the contract is what matters for v2. +// withStatementTimeout returns ctx unchanged (and a no-op cancel) when +// timeout is zero, otherwise a child context with the timeout applied. Each +// statement gets its own deadline so cancellation is scoped to one +// statement at a time. func withStatementTimeout(parent context.Context, timeout time.Duration) (context.Context, context.CancelFunc) { if timeout <= 0 { return parent, func() {} @@ -254,18 +261,23 @@ func withStatementTimeout(parent context.Context, timeout time.Duration) (contex return context.WithTimeout(parent, timeout) } -// reportCancellation distinguishes the three error cases that look the -// same from `executeOne`'s POV (a wrapped pgconn / network error): user -// cancelled via Ctrl+C, --timeout fired, or the statement just plain -// errored. Returns a human-readable message; the caller wraps it. -func reportCancellation(signalCtx, stmtCtx context.Context, err error, timeout time.Duration) string { +// reportCancellation distinguishes the three error cases that look the same +// from `executeOne`'s POV (a wrapped pgconn / network error): user cancelled +// via Ctrl+C, --timeout fired, or the statement just plain errored. Returns +// the human-readable message and whether the cause is invocation-scoped +// (cancel/timeout) rather than statement-scoped. +// +// Precedence: user cancel beats deadline. If both contexts fire near- +// simultaneously (race), we report "cancelled" because the user's intent +// dominates a coincidental timeout. +func reportCancellation(signalCtx, stmtCtx context.Context, err error, timeout time.Duration) (msg string, invocationScoped bool) { switch { case errors.Is(signalCtx.Err(), context.Canceled): - return "Query cancelled." + return "Query cancelled.", true case timeout > 0 && errors.Is(stmtCtx.Err(), context.DeadlineExceeded): - return fmt.Sprintf("Query timed out after %s.", timeout) + return fmt.Sprintf("Query timed out after %s.", timeout), true default: - return formatPgError(err) + return formatPgError(err), false } } diff --git a/experimental/postgres/cmd/signals.go b/experimental/postgres/cmd/signals.go index b946e6b3a01..5e4c29346f9 100644 --- a/experimental/postgres/cmd/signals.go +++ b/experimental/postgres/cmd/signals.go @@ -10,13 +10,13 @@ import ( // watchInterruptSignals installs handlers for SIGINT and SIGTERM that call // cancel when the user hits Ctrl+C or the process gets a SIGTERM. // -// Returns a stop function that uninstalls the handlers; the caller must defer -// it. Calling stop drains the signal channel so a queued signal that arrived -// during shutdown does not leak. +// Returns a stop-and-cancel function that uninstalls the handlers (signal.Stop +// prevents future OS deliveries) and cancels the parent context so the +// goroutine wakes promptly. The caller must defer it. The channel is +// 1-buffered and GC'd on return; no explicit drain is needed. // -// On Windows, Go maps Ctrl+C to os.Interrupt via the console-control-handler. -// The same code path therefore works for the Windows runner; the integration -// test pins this expectation. +// On Windows, Go maps Ctrl+C to os.Interrupt via the console-control-handler, +// so the same code path covers Windows. func watchInterruptSignals(ctx context.Context, cancel context.CancelFunc) func() { sigCh := make(chan os.Signal, 1) signal.Notify(sigCh, os.Interrupt, syscall.SIGTERM) diff --git a/integration/cmd/postgres/postgres_test.go b/integration/cmd/postgres/postgres_test.go index 971c87ad013..32800b3b220 100644 --- a/integration/cmd/postgres/postgres_test.go +++ b/integration/cmd/postgres/postgres_test.go @@ -3,15 +3,21 @@ // autoscaling resource path or provisioned instance name is provided // via DATABRICKS_POSTGRES_INTEGRATION_TARGET. // -// To run locally against a real Lakebase endpoint: +// To run locally against a real Lakebase endpoint, set both the standard +// auth env (DATABRICKS_HOST + DATABRICKS_TOKEN, or a configured profile) +// and the target: // // export DATABRICKS_POSTGRES_INTEGRATION_TARGET=projects/foo/branches/main/endpoints/primary // go test ./integration/cmd/postgres/... -v +// +// Ctrl+C cancellation is intentionally not in this suite: it requires a +// child-process harness (the test runner cannot share signal handlers +// with the in-process command). Tracked as a follow-up. package postgres_test import ( + "encoding/json" "os" - "runtime" "strings" "testing" "time" @@ -19,7 +25,6 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - _ "github.com/databricks/cli/cmd/experimental" "github.com/databricks/cli/internal/testcli" ) @@ -43,8 +48,12 @@ func TestPostgresQuery_SimpleSelect(t *testing.T) { stdout, _ := testcli.RequireSuccessfulRun(t, ctx, "experimental", "postgres", "query", "--target", target, "--output", "json", "SELECT 1 AS x") - out := stdout.String() - assert.Contains(t, out, `"x":1`) + // Parsing the JSON instead of substring-matching makes the test robust + // to encoder formatting drift (whitespace, key order). + var rows []map[string]any + require.NoError(t, json.Unmarshal(stdout.Bytes(), &rows)) + require.Len(t, rows, 1) + assert.EqualValues(t, 1, rows[0]["x"]) } func TestPostgresQuery_CommandOnly(t *testing.T) { @@ -54,42 +63,38 @@ func TestPostgresQuery_CommandOnly(t *testing.T) { stdout, _ := testcli.RequireSuccessfulRun(t, ctx, "experimental", "postgres", "query", "--target", target, "--output", "json", "SET search_path TO public") - out := stdout.String() - assert.Contains(t, out, `"command":"SET"`) + var obj map[string]any + require.NoError(t, json.Unmarshal(stdout.Bytes(), &obj)) + assert.Equal(t, "SET", obj["command"]) } func TestPostgresQuery_TimeoutFires(t *testing.T) { target := requireTarget(t) ctx := t.Context() - // pg_sleep(5) with --timeout 1s should fail in well under 5s. + // pg_sleep(5) with --timeout 1s should fail well within the watcher's + // 5s DeadlineDelay. A loose <5s bound would still pass even if the + // watcher silently regressed to TCP-keepalive timeout (~minutes); the + // tighter <3s bound catches that. start := time.Now() - _, stderr, err := testcli.RequireErrorRun(t, ctx, "experimental", "postgres", "query", + _, stderr, _ := testcli.RequireErrorRun(t, ctx, "experimental", "postgres", "query", "--target", target, "--timeout", "1s", "SELECT pg_sleep(5)") - require.Error(t, err) - assert.Less(t, time.Since(start), 5*time.Second, "--timeout should cancel before pg_sleep finishes") + assert.Less(t, time.Since(start), 3*time.Second, "--timeout should cancel before pg_sleep finishes") assert.Contains(t, stderr.String(), "timed out after 1s") } -func TestPostgresQuery_CancelOnInterrupt(t *testing.T) { - if runtime.GOOS == "windows" { - t.Skip("Ctrl+C signal-driven cancel test is run via a separate harness on Windows") - } - requireTarget(t) - t.Skip("manual: signal-driven cancel must be exercised with a child process; see plan section 'Cancellation and timeout'") -} - func TestPostgresQuery_StreamingCSV(t *testing.T) { target := requireTarget(t) ctx := t.Context() - // generate_series streams via pgx without buffering into memory; pick a - // small-but-non-trivial bound so the test stays fast. + // 100k rows is large enough to exercise streaming under realistic memory + // pressure (the buffered text path would still complete but allocate + // the whole result; the streaming CSV path keeps allocations bounded). stdout, _ := testcli.RequireSuccessfulRun(t, ctx, "experimental", "postgres", "query", - "--target", target, "--output", "csv", "SELECT * FROM generate_series(1, 100) AS s") + "--target", target, "--output", "csv", "SELECT * FROM generate_series(1, 100000) AS s") lines := strings.Split(strings.TrimRight(stdout.String(), "\n"), "\n") - assert.GreaterOrEqual(t, len(lines), 101, "expected header + 100 rows") + assert.GreaterOrEqual(t, len(lines), 100001, "expected header + 100000 rows") assert.Equal(t, "s", lines[0]) } @@ -101,9 +106,9 @@ func TestPostgresQuery_MultiInputJSON(t *testing.T) { "--target", target, "--output", "json", "SELECT 1 AS a", "SELECT 2 AS b") - out := stdout.String() - assert.Contains(t, out, `"sql":"SELECT 1 AS a"`) - assert.Contains(t, out, `"sql":"SELECT 2 AS b"`) - assert.Contains(t, out, `"a":1`) - assert.Contains(t, out, `"b":2`) + var results []map[string]any + require.NoError(t, json.Unmarshal(stdout.Bytes(), &results)) + require.Len(t, results, 2) + assert.Equal(t, "SELECT 1 AS a", results[0]["sql"]) + assert.Equal(t, "SELECT 2 AS b", results[1]["sql"]) } From c57805f858693dca9b7f42fc47b7e3937c73c3ba Mon Sep 17 00:00:00 2001 From: simon Date: Thu, 30 Apr 2026 12:07:33 +0200 Subject: [PATCH 3/5] PR 4 r2: warm-up TimeoutFires test before timing --timeout Round-2 reviewer flagged that the previous <3s bound was tight enough to flake on a cold Lakebase autoscaling endpoint, where auth + connect + cold-start can plausibly take >2s on its own. The regression we actually want to catch (silent fall-back to TCP keepalive) takes minutes, so <5s is enough. Add a warm-up RequireSuccessfulRun before timing so the assertion measures what it claims to measure: how long the 1-second deadline takes to actually cancel the in-flight statement. Co-authored-by: Isaac --- integration/cmd/postgres/postgres_test.go | 16 ++++++++++++---- 1 file changed, 12 insertions(+), 4 deletions(-) diff --git a/integration/cmd/postgres/postgres_test.go b/integration/cmd/postgres/postgres_test.go index 32800b3b220..76d3127ccb5 100644 --- a/integration/cmd/postgres/postgres_test.go +++ b/integration/cmd/postgres/postgres_test.go @@ -72,14 +72,22 @@ func TestPostgresQuery_TimeoutFires(t *testing.T) { target := requireTarget(t) ctx := t.Context() + // Warm up first: pay the auth + connect (and potential cold-start) + // cost before timing the --timeout assertion. Without this, a cold + // Lakebase autoscaling endpoint could push the timed run past any + // reasonable deadline even though --timeout did exactly the right + // thing. Now `start` measures what we care about: how long the + // 1-second deadline takes to actually cancel the in-flight statement. + testcli.RequireSuccessfulRun(t, ctx, "experimental", "postgres", "query", + "--target", target, "--output", "json", "SELECT 1") + // pg_sleep(5) with --timeout 1s should fail well within the watcher's - // 5s DeadlineDelay. A loose <5s bound would still pass even if the - // watcher silently regressed to TCP-keepalive timeout (~minutes); the - // tighter <3s bound catches that. + // 5s DeadlineDelay. <5s rules out a silent regression to the + // TCP-keepalive timeout (~minutes). start := time.Now() _, stderr, _ := testcli.RequireErrorRun(t, ctx, "experimental", "postgres", "query", "--target", target, "--timeout", "1s", "SELECT pg_sleep(5)") - assert.Less(t, time.Since(start), 3*time.Second, "--timeout should cancel before pg_sleep finishes") + assert.Less(t, time.Since(start), 5*time.Second, "--timeout should cancel before pg_sleep finishes") assert.Contains(t, stderr.String(), "timed out after 1s") } From 6cd264e55f81a406dbc3d59d169c9574c315c964 Mon Sep 17 00:00:00 2001 From: simon Date: Thu, 30 Apr 2026 12:21:48 +0200 Subject: [PATCH 4/5] Drop integration tests for experimental command Other experimental commands (aitools) have no integration tests; an experimental command is by definition pre-stabilization, and gating its real-wire test on a custom env var introduces friction without a clear win. Acceptance tests + unit tests already cover argument validation, targeting resolution (SDK-mocked), and the streaming / multi-input output shapes. The cancellation watcher and --timeout are unit-tested via the seam in cancel_test.go. When this command graduates from experimental, integration tests are the right addition; for v1 they were over-engineered. Co-authored-by: Isaac --- integration/cmd/postgres/postgres_test.go | 122 ---------------------- 1 file changed, 122 deletions(-) delete mode 100644 integration/cmd/postgres/postgres_test.go diff --git a/integration/cmd/postgres/postgres_test.go b/integration/cmd/postgres/postgres_test.go deleted file mode 100644 index 76d3127ccb5..00000000000 --- a/integration/cmd/postgres/postgres_test.go +++ /dev/null @@ -1,122 +0,0 @@ -// Package postgres_test contains integration tests for the experimental -// `databricks experimental postgres query` command. Skipped unless an -// autoscaling resource path or provisioned instance name is provided -// via DATABRICKS_POSTGRES_INTEGRATION_TARGET. -// -// To run locally against a real Lakebase endpoint, set both the standard -// auth env (DATABRICKS_HOST + DATABRICKS_TOKEN, or a configured profile) -// and the target: -// -// export DATABRICKS_POSTGRES_INTEGRATION_TARGET=projects/foo/branches/main/endpoints/primary -// go test ./integration/cmd/postgres/... -v -// -// Ctrl+C cancellation is intentionally not in this suite: it requires a -// child-process harness (the test runner cannot share signal handlers -// with the in-process command). Tracked as a follow-up. -package postgres_test - -import ( - "encoding/json" - "os" - "strings" - "testing" - "time" - - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - - "github.com/databricks/cli/internal/testcli" -) - -// targetEnv is the env var that gates these tests. Either a provisioned -// instance name or an autoscaling resource path; the command picks the -// right resolver based on the leading "projects/" segment. -const targetEnv = "DATABRICKS_POSTGRES_INTEGRATION_TARGET" - -func requireTarget(t *testing.T) string { - target := os.Getenv(targetEnv) - if target == "" { - t.Skipf("set %s to run postgres integration tests", targetEnv) - } - return target -} - -func TestPostgresQuery_SimpleSelect(t *testing.T) { - target := requireTarget(t) - ctx := t.Context() - - stdout, _ := testcli.RequireSuccessfulRun(t, ctx, "experimental", "postgres", "query", - "--target", target, "--output", "json", "SELECT 1 AS x") - - // Parsing the JSON instead of substring-matching makes the test robust - // to encoder formatting drift (whitespace, key order). - var rows []map[string]any - require.NoError(t, json.Unmarshal(stdout.Bytes(), &rows)) - require.Len(t, rows, 1) - assert.EqualValues(t, 1, rows[0]["x"]) -} - -func TestPostgresQuery_CommandOnly(t *testing.T) { - target := requireTarget(t) - ctx := t.Context() - - stdout, _ := testcli.RequireSuccessfulRun(t, ctx, "experimental", "postgres", "query", - "--target", target, "--output", "json", "SET search_path TO public") - - var obj map[string]any - require.NoError(t, json.Unmarshal(stdout.Bytes(), &obj)) - assert.Equal(t, "SET", obj["command"]) -} - -func TestPostgresQuery_TimeoutFires(t *testing.T) { - target := requireTarget(t) - ctx := t.Context() - - // Warm up first: pay the auth + connect (and potential cold-start) - // cost before timing the --timeout assertion. Without this, a cold - // Lakebase autoscaling endpoint could push the timed run past any - // reasonable deadline even though --timeout did exactly the right - // thing. Now `start` measures what we care about: how long the - // 1-second deadline takes to actually cancel the in-flight statement. - testcli.RequireSuccessfulRun(t, ctx, "experimental", "postgres", "query", - "--target", target, "--output", "json", "SELECT 1") - - // pg_sleep(5) with --timeout 1s should fail well within the watcher's - // 5s DeadlineDelay. <5s rules out a silent regression to the - // TCP-keepalive timeout (~minutes). - start := time.Now() - _, stderr, _ := testcli.RequireErrorRun(t, ctx, "experimental", "postgres", "query", - "--target", target, "--timeout", "1s", "SELECT pg_sleep(5)") - assert.Less(t, time.Since(start), 5*time.Second, "--timeout should cancel before pg_sleep finishes") - assert.Contains(t, stderr.String(), "timed out after 1s") -} - -func TestPostgresQuery_StreamingCSV(t *testing.T) { - target := requireTarget(t) - ctx := t.Context() - - // 100k rows is large enough to exercise streaming under realistic memory - // pressure (the buffered text path would still complete but allocate - // the whole result; the streaming CSV path keeps allocations bounded). - stdout, _ := testcli.RequireSuccessfulRun(t, ctx, "experimental", "postgres", "query", - "--target", target, "--output", "csv", "SELECT * FROM generate_series(1, 100000) AS s") - - lines := strings.Split(strings.TrimRight(stdout.String(), "\n"), "\n") - assert.GreaterOrEqual(t, len(lines), 100001, "expected header + 100000 rows") - assert.Equal(t, "s", lines[0]) -} - -func TestPostgresQuery_MultiInputJSON(t *testing.T) { - target := requireTarget(t) - ctx := t.Context() - - stdout, _ := testcli.RequireSuccessfulRun(t, ctx, "experimental", "postgres", "query", - "--target", target, "--output", "json", - "SELECT 1 AS a", "SELECT 2 AS b") - - var results []map[string]any - require.NoError(t, json.Unmarshal(stdout.Bytes(), &results)) - require.Len(t, results, 2) - assert.Equal(t, "SELECT 1 AS a", results[0]["sql"]) - assert.Equal(t, "SELECT 2 AS b", results[1]["sql"]) -} From a275d48117e3265545b4bf95cbeb9ae9ff281afd Mon Sep 17 00:00:00 2001 From: simon Date: Thu, 30 Apr 2026 15:20:13 +0200 Subject: [PATCH 5/5] Address PR 4 nitpicker finding: TUI fallback on tableview.Run error MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit If tableview.Run errors (TUI startup failure, terminal resize race, etc.), fall through to the static tabwriter path instead of returning the error to the caller. Without the fallback, a successful query surfaces as "viewer failed" with no data — the user paid for the query but doesn't see the rows. Co-authored-by: Isaac --- experimental/postgres/cmd/render.go | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/experimental/postgres/cmd/render.go b/experimental/postgres/cmd/render.go index d00b4824e46..0d09556ece7 100644 --- a/experimental/postgres/cmd/render.go +++ b/experimental/postgres/cmd/render.go @@ -81,7 +81,13 @@ func (s *textSink) End(commandTag string) error { } if s.interactive && len(s.rows) > staticTableThreshold { - return tableview.Run(s.out, s.columns, s.rows) + // Try the interactive viewer; on failure (TUI startup, terminal + // resize race, etc.) fall through to the static path so the user + // still sees the rows their query returned. Without this fallback + // a successful query would surface as "viewer failed" with no data. + if err := tableview.Run(s.out, s.columns, s.rows); err == nil { + return nil + } } tw := tabwriter.NewWriter(s.out, 0, 0, 2, ' ', 0)