Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 6 additions & 35 deletions server/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
pg_query "github.com/pganalyze/pg_query_go/v6"
"github.com/posthog/duckgres/duckdbservice/arrowmap"
"github.com/posthog/duckgres/server/auth"
"github.com/posthog/duckgres/server/sqlcore"
"github.com/posthog/duckgres/transpiler"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
Expand Down Expand Up @@ -2476,41 +2477,11 @@ func countDollarParams(query string) int {
return max
}

// isEmptyQuery checks if a query contains only semicolons, whitespace, and/or comments.
// PostgreSQL returns EmptyQueryResponse for queries like ";", ";;;", "-- ping", etc.
func isEmptyQuery(query string) bool {
// Strip SQL comments first (e.g., pgx sends "-- ping" for Ping())
stripped := stripLeadingComments(query)
for _, r := range stripped {
if r != ';' && r != ' ' && r != '\t' && r != '\n' && r != '\r' {
return false
}
}
return true
}

// stripLeadingComments removes leading SQL comments from a query.
// Handles both block comments /* ... */ and line comments -- ...
func stripLeadingComments(query string) string {
for {
query = strings.TrimSpace(query)
if strings.HasPrefix(query, "/*") {
end := strings.Index(query, "*/")
if end == -1 {
return query
}
query = query[end+2:]
} else if strings.HasPrefix(query, "--") {
end := strings.Index(query, "\n")
if end == -1 {
return ""
}
query = query[end+1:]
} else {
return query
}
}
}
// isEmptyQuery and stripLeadingComments moved to server/sqlcore so the
// Flight client can call them without importing server. Local thin wrappers
// preserve the unexported call-site spellings used throughout this file.
func isEmptyQuery(query string) bool { return sqlcore.IsEmptyQuery(query) }
func stripLeadingComments(query string) string { return sqlcore.StripLeadingComments(query) }

// stripLeadingNoise strips leading whitespace, comments, and parentheses from
// a query string in a loop until none remain. This handles cases like
Expand Down
57 changes: 13 additions & 44 deletions server/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,52 +4,21 @@ import (
"context"
"database/sql"
"os"
)

// ColumnTyper provides type name information for a database column.
// *sql.ColumnType satisfies this interface.
type ColumnTyper interface {
DatabaseTypeName() string
}

// RowSet represents a set of rows from a query result.
type RowSet interface {
Columns() ([]string, error)
ColumnTypes() ([]ColumnTyper, error)
Next() bool
Scan(dest ...any) error
Close() error
Err() error
}

// ExecResult represents the result of a non-query execution.
type ExecResult interface {
RowsAffected() (int64, error)
}

// RawConn provides access to the underlying driver connection.
// *sql.Conn satisfies this interface.
type RawConn interface {
Raw(func(any) error) error
Close() error
}

// QueryExecutor abstracts database query execution, allowing both local (*sql.DB)
// and remote (Arrow Flight SQL) backends.
type QueryExecutor interface {
QueryContext(ctx context.Context, query string, args ...any) (RowSet, error)
ExecContext(ctx context.Context, query string, args ...any) (ExecResult, error)
Query(query string, args ...any) (RowSet, error)
Exec(query string, args ...any) (ExecResult, error)
ConnContext(ctx context.Context) (RawConn, error)
PingContext(ctx context.Context) error
Close() error
"github.com/posthog/duckgres/server/sqlcore"
)

// LastProfilingOutput returns the JSON profiling output from the last
// executed query, or "" if profiling is not enabled or not available
// (e.g. Flight SQL mode where the query ran on a remote worker).
LastProfilingOutput() string
}
// The SQL/result interfaces moved to server/sqlcore so the Flight client
// and other duckdb-free callers can implement them without importing
// server. The aliases below preserve the old server.X spellings for the
// dozens of references inside this package and elsewhere.
type (
ColumnTyper = sqlcore.ColumnTyper
RowSet = sqlcore.RowSet
ExecResult = sqlcore.ExecResult
RawConn = sqlcore.RawConn
QueryExecutor = sqlcore.QueryExecutor
)

// LocalExecutor wraps *sql.DB to implement QueryExecutor for local DuckDB access.
type LocalExecutor struct {
Expand Down
5 changes: 0 additions & 5 deletions server/exports.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,11 +127,6 @@ func GenerateSecretKey() int32 {
return generateSecretKey()
}

// IsEmptyQuery checks if a query contains only semicolons, whitespace, and/or SQL comments.
// PostgreSQL returns EmptyQueryResponse for queries like ";", "-- ping", "/* */", etc.
func IsEmptyQuery(query string) bool {
return isEmptyQuery(query)
}

// SetQueryLogger sets the query logger on a Server. Used by the control plane
// to attach a query logger to the minimal server after creation.
Expand Down
28 changes: 14 additions & 14 deletions server/flightclient/flight_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import (
"github.com/apache/arrow-go/v18/arrow/flight/flightsql"
"github.com/apache/arrow-go/v18/arrow/memory"
"github.com/posthog/duckgres/duckdbservice/arrowmap"
"github.com/posthog/duckgres/server"
"github.com/posthog/duckgres/server/sqlcore"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/metadata"
Expand Down Expand Up @@ -70,7 +70,7 @@ func NewFlightExecutor(addr, bearerToken, sessionToken string) (*FlightExecutor,

// Propagate OTEL trace context across gRPC to worker pods.
// Filtered to query RPCs only (GetFlightInfo, DoGet).
dialOpts = append(dialOpts, server.OTELGRPCClientHandler())
dialOpts = append(dialOpts, sqlcore.OTELGRPCClientHandler())

if bearerToken != "" {
dialOpts = append(dialOpts, grpc.WithPerRPCCredentials(&bearerCreds{token: bearerToken}))
Expand Down Expand Up @@ -156,14 +156,14 @@ func recoverClientPanic(err *error) {
}
}

func (e *FlightExecutor) QueryContext(ctx context.Context, query string, args ...any) (rs server.RowSet, err error) {
func (e *FlightExecutor) QueryContext(ctx context.Context, query string, args ...any) (rs sqlcore.RowSet, err error) {
if e.dead.Load() {
return nil, ErrWorkerDead
}

// Return empty results for queries that are only semicolons, whitespace,
// and/or comments. These represent PostgreSQL client pings (e.g., pgx sends "-- ping").
if server.IsEmptyQuery(query) {
if sqlcore.IsEmptyQuery(query) {
return &emptyRowSet{}, nil
}

Expand Down Expand Up @@ -221,13 +221,13 @@ func (e *FlightExecutor) QueryContext(ctx context.Context, query string, args ..
}, nil
}

func (e *FlightExecutor) ExecContext(ctx context.Context, query string, args ...any) (result server.ExecResult, err error) {
func (e *FlightExecutor) ExecContext(ctx context.Context, query string, args ...any) (result sqlcore.ExecResult, err error) {
if e.dead.Load() {
return nil, ErrWorkerDead
}

// Return zero rows affected for empty/comment-only queries.
if server.IsEmptyQuery(query) {
if sqlcore.IsEmptyQuery(query) {
return &flightExecResult{rowsAffected: 0}, nil
}

Expand All @@ -252,15 +252,15 @@ func (e *FlightExecutor) ExecContext(ctx context.Context, query string, args ...
return &flightExecResult{rowsAffected: affected}, nil
}

func (e *FlightExecutor) Query(query string, args ...any) (server.RowSet, error) {
func (e *FlightExecutor) Query(query string, args ...any) (sqlcore.RowSet, error) {
return e.QueryContext(context.Background(), query, args...)
}

func (e *FlightExecutor) Exec(query string, args ...any) (server.ExecResult, error) {
func (e *FlightExecutor) Exec(query string, args ...any) (sqlcore.ExecResult, error) {
return e.ExecContext(context.Background(), query, args...)
}

func (e *FlightExecutor) ConnContext(ctx context.Context) (server.RawConn, error) {
func (e *FlightExecutor) ConnContext(ctx context.Context) (sqlcore.RawConn, error) {
return nil, fmt.Errorf("ConnContext not supported in Flight mode (use batched INSERT for COPY FROM)")
}

Expand Down Expand Up @@ -341,8 +341,8 @@ func (r *FlightRowSet) Columns() ([]string, error) {
return names, nil
}

func (r *FlightRowSet) ColumnTypes() ([]server.ColumnTyper, error) {
types := make([]server.ColumnTyper, r.schema.NumFields())
func (r *FlightRowSet) ColumnTypes() ([]sqlcore.ColumnTyper, error) {
types := make([]sqlcore.ColumnTyper, r.schema.NumFields())
for i := 0; i < r.schema.NumFields(); i++ {
types[i] = &arrowColumnType{dt: r.schema.Field(i).Type}
}
Expand Down Expand Up @@ -428,7 +428,7 @@ func (r *FlightRowSet) Err() error {
type emptyRowSet struct{}

func (e *emptyRowSet) Columns() ([]string, error) { return nil, nil }
func (e *emptyRowSet) ColumnTypes() ([]server.ColumnTyper, error) { return nil, nil }
func (e *emptyRowSet) ColumnTypes() ([]sqlcore.ColumnTyper, error) { return nil, nil }
func (e *emptyRowSet) Next() bool { return false }
func (e *emptyRowSet) Scan(dest ...any) error { return fmt.Errorf("no rows") }
func (e *emptyRowSet) Close() error { return nil }
Expand All @@ -450,8 +450,8 @@ func (e *emptySchemaRowSet) Columns() ([]string, error) {
return cols, nil
}

func (e *emptySchemaRowSet) ColumnTypes() ([]server.ColumnTyper, error) {
types := make([]server.ColumnTyper, e.schema.NumFields())
func (e *emptySchemaRowSet) ColumnTypes() ([]sqlcore.ColumnTyper, error) {
types := make([]sqlcore.ColumnTyper, e.schema.NumFields())
for i := 0; i < e.schema.NumFields(); i++ {
types[i] = &arrowColumnType{dt: e.schema.Field(i).Type}
}
Expand Down
57 changes: 57 additions & 0 deletions server/sqlcore/interfaces.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
// Package sqlcore holds the duckgres-internal SQL/result interfaces that
// span the wire-protocol/server layer and the Arrow Flight client. It also
// hosts a couple of small helpers (IsEmptyQuery, OTELGRPCClientHandler)
// shared between those layers.
//
// The package has no dependency on github.com/duckdb/duckdb-go, so any
// caller that wants to operate against duckgres without linking libduckdb
// (notably the Flight client and a future control-plane-only binary) can
// import this package without dragging the DuckDB driver in.
package sqlcore

import "context"

// ColumnTyper provides type name information for a database column.
// *sql.ColumnType satisfies this interface.
type ColumnTyper interface {
DatabaseTypeName() string
}

// RowSet represents a set of rows from a query result.
type RowSet interface {
Columns() ([]string, error)
ColumnTypes() ([]ColumnTyper, error)
Next() bool
Scan(dest ...any) error
Close() error
Err() error
}

// ExecResult represents the result of a non-query execution.
type ExecResult interface {
RowsAffected() (int64, error)
}

// RawConn provides access to the underlying driver connection.
// *sql.Conn satisfies this interface.
type RawConn interface {
Raw(func(any) error) error
Close() error
}

// QueryExecutor abstracts database query execution, allowing both local
// (*sql.DB) and remote (Arrow Flight SQL) backends.
type QueryExecutor interface {
QueryContext(ctx context.Context, query string, args ...any) (RowSet, error)
ExecContext(ctx context.Context, query string, args ...any) (ExecResult, error)
Query(query string, args ...any) (RowSet, error)
Exec(query string, args ...any) (ExecResult, error)
ConnContext(ctx context.Context) (RawConn, error)
PingContext(ctx context.Context) error
Close() error

// LastProfilingOutput returns the JSON profiling output from the last
// executed query, or "" if profiling is not enabled or not available
// (e.g. Flight SQL mode where the query ran on a remote worker).
LastProfilingOutput() string
}
2 changes: 1 addition & 1 deletion server/otelgrpc_filter.go → server/sqlcore/otel.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package server
package sqlcore

import (
"strings"
Expand Down
41 changes: 41 additions & 0 deletions server/sqlcore/query.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package sqlcore

import "strings"

// IsEmptyQuery checks if a query contains only semicolons, whitespace, and/or
// SQL comments. PostgreSQL returns EmptyQueryResponse for queries like ";",
// "-- ping", "/* */", etc.
func IsEmptyQuery(query string) bool {
stripped := StripLeadingComments(query)
for _, r := range stripped {
if r != ';' && r != ' ' && r != '\t' && r != '\n' && r != '\r' {
return false
}
}
return true
}

// StripLeadingComments removes leading SQL comments from a query.
// Handles both block comments /* ... */ and line comments -- ...
func StripLeadingComments(query string) string {
for {
query = strings.TrimSpace(query)
if strings.HasPrefix(query, "/*") {
end := strings.Index(query, "*/")
if end == -1 {
return query
}
query = query[end+2:]
continue
}
if strings.HasPrefix(query, "--") {
nl := strings.IndexByte(query, '\n')
if nl == -1 {
return ""
}
query = query[nl+1:]
continue
}
return query
}
}
13 changes: 13 additions & 0 deletions server/sqlcore_aliases.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package server

import "github.com/posthog/duckgres/server/sqlcore"

// Re-exports kept here so existing references to server.IsEmptyQuery and
// server.OTELGRPCClientHandler continue to compile after the helpers
// moved into server/sqlcore. The sqlcore package is duckdb-free; new code
// (notably the Flight client) should import server/sqlcore directly.

var (
IsEmptyQuery = sqlcore.IsEmptyQuery
OTELGRPCClientHandler = sqlcore.OTELGRPCClientHandler
)