Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 7 additions & 3 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,18 @@ ARG BUILD_TAGS=""
ARG TARGETARCH
ARG DUCKDB_EXTENSION_VERSION=1.5.2
ARG HTTPFS_EXTENSION_TAG=v1.5.2-stoi-fix
ARG DUCKDB_EXTENSION_REPOSITORY=https://extensions.duckdb.org
ARG DUCKDB_NIGHTLY_EXTENSION_REPOSITORY=http://nightly-extensions.duckdb.org
RUN CGO_ENABLED=1 go build -tags "${BUILD_TAGS}" -ldflags "-X main.version=${VERSION} -X main.commit=${COMMIT} -X main.date=$(date -u +%Y-%m-%dT%H:%M:%SZ)" -o duckgres .
RUN mkdir -p "/build/duckdb-extensions/v${DUCKDB_EXTENSION_VERSION}/linux_${TARGETARCH}" \
&& curl -fsSL "https://github.com/benben/duckdb-httpfs/releases/download/${HTTPFS_EXTENSION_TAG}/httpfs-linux-${TARGETARCH}.duckdb_extension" \
-o "/build/duckdb-extensions/v${DUCKDB_EXTENSION_VERSION}/linux_${TARGETARCH}/httpfs.duckdb_extension" \
&& for ext in ducklake postgres_scanner json; do \
curl -fsSL "https://extensions.duckdb.org/v${DUCKDB_EXTENSION_VERSION}/linux_${TARGETARCH}/${ext}.duckdb_extension.gz" \
&& for ext in ducklake json; do \
curl -fsSL "${DUCKDB_EXTENSION_REPOSITORY}/v${DUCKDB_EXTENSION_VERSION}/linux_${TARGETARCH}/${ext}.duckdb_extension.gz" \
| gunzip > "/build/duckdb-extensions/v${DUCKDB_EXTENSION_VERSION}/linux_${TARGETARCH}/${ext}.duckdb_extension"; \
done
done \
&& curl -fsSL "${DUCKDB_NIGHTLY_EXTENSION_REPOSITORY}/v${DUCKDB_EXTENSION_VERSION}/linux_${TARGETARCH}/postgres_scanner.duckdb_extension.gz" \
| gunzip > "/build/duckdb-extensions/v${DUCKDB_EXTENSION_VERSION}/linux_${TARGETARCH}/postgres_scanner.duckdb_extension"

FROM debian:bookworm-slim

Expand Down
156 changes: 154 additions & 2 deletions server/bundled_extensions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package server
import (
"os"
"path/filepath"
"sync"
"testing"
)

Expand Down Expand Up @@ -41,15 +42,15 @@ func TestSeedBundledExtensionsCopiesMissingFiles(t *testing.T) {
}
}

func TestSeedBundledExtensionsPreservesExistingFiles(t *testing.T) {
func TestSeedBundledExtensionsPreservesExistingFilesWithMatchingContents(t *testing.T) {
srcRoot := t.TempDir()
dstRoot := t.TempDir()

srcDir := filepath.Join(srcRoot, "v1.5.2", "linux_arm64")
if err := os.MkdirAll(srcDir, 0o755); err != nil {
t.Fatalf("mkdir src: %v", err)
}
if err := os.WriteFile(filepath.Join(srcDir, "httpfs.duckdb_extension"), []byte("new"), 0o644); err != nil {
if err := os.WriteFile(filepath.Join(srcDir, "httpfs.duckdb_extension"), []byte("existing"), 0o644); err != nil {
t.Fatalf("write src extension: %v", err)
}

Expand All @@ -74,3 +75,154 @@ func TestSeedBundledExtensionsPreservesExistingFiles(t *testing.T) {
t.Fatalf("expected existing extension to be preserved, got %q", string(got))
}
}

func TestSeedBundledExtensionsReplacesExistingFilesWithUpdatedContents(t *testing.T) {
srcRoot := t.TempDir()
dstRoot := t.TempDir()

srcDir := filepath.Join(srcRoot, "v1.5.2", "linux_arm64")
if err := os.MkdirAll(srcDir, 0o755); err != nil {
t.Fatalf("mkdir src: %v", err)
}
srcExt := filepath.Join(srcDir, "postgres_scanner.duckdb_extension")
if err := os.WriteFile(srcExt, []byte("nightly"), 0o644); err != nil {
t.Fatalf("write src extension: %v", err)
}

dstDir := filepath.Join(dstRoot, "v1.5.2", "linux_arm64")
if err := os.MkdirAll(dstDir, 0o755); err != nil {
t.Fatalf("mkdir dst: %v", err)
}
dstExt := filepath.Join(dstDir, "postgres_scanner.duckdb_extension")
if err := os.WriteFile(dstExt, []byte("stable"), 0o644); err != nil {
t.Fatalf("write dst extension: %v", err)
}

if err := seedBundledExtensions(srcRoot, dstRoot); err != nil {
t.Fatalf("seedBundledExtensions: %v", err)
}

got, err := os.ReadFile(dstExt)
if err != nil {
t.Fatalf("read dst extension: %v", err)
}
if string(got) != "nightly" {
t.Fatalf("expected existing extension to be replaced, got %q", string(got))
}
}

func TestSeedBundledExtensionsPreservesNonTargetedChangedFiles(t *testing.T) {
srcRoot := t.TempDir()
dstRoot := t.TempDir()

srcDir := filepath.Join(srcRoot, "v1.5.2", "linux_arm64")
if err := os.MkdirAll(srcDir, 0o755); err != nil {
t.Fatalf("mkdir src: %v", err)
}
srcExt := filepath.Join(srcDir, "httpfs.duckdb_extension")
if err := os.WriteFile(srcExt, []byte("new-httpfs"), 0o644); err != nil {
t.Fatalf("write src extension: %v", err)
}

dstDir := filepath.Join(dstRoot, "v1.5.2", "linux_arm64")
if err := os.MkdirAll(dstDir, 0o755); err != nil {
t.Fatalf("mkdir dst: %v", err)
}
dstExt := filepath.Join(dstDir, "httpfs.duckdb_extension")
if err := os.WriteFile(dstExt, []byte("existing-httpfs"), 0o644); err != nil {
t.Fatalf("write dst extension: %v", err)
}

if err := seedBundledExtensions(srcRoot, dstRoot); err != nil {
t.Fatalf("seedBundledExtensions: %v", err)
}

got, err := os.ReadFile(dstExt)
if err != nil {
t.Fatalf("read dst extension: %v", err)
}
if string(got) != "existing-httpfs" {
t.Fatalf("expected non-targeted extension to be preserved, got %q", string(got))
}
}

func TestBootstrapBundledExtensionsSeedsBundledExtensions(t *testing.T) {
bundledRoot := t.TempDir()
dataDir := t.TempDir()

srcDir := filepath.Join(bundledRoot, "v1.5.2", "linux_arm64")
if err := os.MkdirAll(srcDir, 0o755); err != nil {
t.Fatalf("mkdir src: %v", err)
}
srcExt := filepath.Join(srcDir, "postgres_scanner.duckdb_extension")
if err := os.WriteFile(srcExt, []byte("nightly"), 0o644); err != nil {
t.Fatalf("write src extension: %v", err)
}

prevBundledRoot := bundledDuckDBExtensionsDir
bundledDuckDBExtensionsDir = bundledRoot
defer func() { bundledDuckDBExtensionsDir = prevBundledRoot }()

bundledExtensionBootstrap = struct {
mu sync.Mutex
byPath map[string]error
}{}

if err := bootstrapBundledExtensions(dataDir); err != nil {
t.Fatalf("bootstrapBundledExtensions: %v", err)
}

extDir := filepath.Join(dataDir, "extensions")
dstExt := filepath.Join(extDir, "v1.5.2", "linux_arm64", "postgres_scanner.duckdb_extension")
got, err := os.ReadFile(dstExt)
if err != nil {
t.Fatalf("read dst extension: %v", err)
}
if string(got) != "nightly" {
t.Fatalf("expected seeded extension to match bundled contents, got %q", string(got))
}
}

func TestBootstrapBundledExtensionsRunsOncePerExtensionDirectory(t *testing.T) {
bundledRoot := t.TempDir()
dataDir := t.TempDir()
extDir := filepath.Join(dataDir, "extensions")

srcDir := filepath.Join(bundledRoot, "v1.5.2", "linux_arm64")
if err := os.MkdirAll(srcDir, 0o755); err != nil {
t.Fatalf("mkdir src: %v", err)
}
srcExt := filepath.Join(srcDir, "postgres_scanner.duckdb_extension")
if err := os.WriteFile(srcExt, []byte("nightly"), 0o644); err != nil {
t.Fatalf("write src extension: %v", err)
}

prevBundledRoot := bundledDuckDBExtensionsDir
bundledDuckDBExtensionsDir = bundledRoot
defer func() { bundledDuckDBExtensionsDir = prevBundledRoot }()

bundledExtensionBootstrap = struct {
mu sync.Mutex
byPath map[string]error
}{}

if err := bootstrapBundledExtensions(dataDir); err != nil {
t.Fatalf("bootstrapBundledExtensions: %v", err)
}

if err := os.WriteFile(srcExt, []byte("newer-nightly"), 0o644); err != nil {
t.Fatalf("rewrite src extension: %v", err)
}
if err := bootstrapBundledExtensions(dataDir); err != nil {
t.Fatalf("second bootstrapBundledExtensions: %v", err)
}

dstExt := filepath.Join(extDir, "v1.5.2", "linux_arm64", "postgres_scanner.duckdb_extension")
got, err := os.ReadFile(dstExt)
if err != nil {
t.Fatalf("read dst extension: %v", err)
}
if string(got) != "nightly" {
t.Fatalf("expected bootstrap to run once, got %q", string(got))
}
}
95 changes: 67 additions & 28 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,14 +37,36 @@ var processVersion = "dev"
// clients pinning connection goroutines indefinitely.
var startupReadTimeout = 30 * time.Second

const bundledDuckDBExtensionsDir = "/app/extensions"
var bundledDuckDBExtensionsDir = "/app/extensions"

var bundledExtensionBootstrap struct {
mu sync.Mutex
byPath map[string]error
}

// SetProcessVersion sets the version string for this process. Called from main().
func SetProcessVersion(v string) { processVersion = v }

// ProcessVersion returns the version string for this process.
func ProcessVersion() string { return processVersion }

func bootstrapBundledExtensions(dataDir string) error {
extDir := filepath.Join(dataDir, "extensions")

bundledExtensionBootstrap.mu.Lock()
defer bundledExtensionBootstrap.mu.Unlock()
if bundledExtensionBootstrap.byPath == nil {
bundledExtensionBootstrap.byPath = make(map[string]error)
}
if err, ok := bundledExtensionBootstrap.byPath[extDir]; ok {
return err
}

err := seedBundledExtensions(bundledDuckDBExtensionsDir, extDir)
bundledExtensionBootstrap.byPath[extDir] = err
return err
}

// passwordPattern matches password=<value> or password: <value> with quoted or unquoted values.
var passwordPattern = regexp.MustCompile(`(?i)(password\s*[=:]\s*)("[^"]*"|[^\s"]+)`)

Expand Down Expand Up @@ -477,6 +499,10 @@ func New(cfg Config) (*Server, error) {
ensureDuckLakeMigrationCheck(cfg.DuckLake, cfg.DataDir)
}

if err := bootstrapBundledExtensions(cfg.DataDir); err != nil {
slog.Warn("Failed to bootstrap bundled DuckDB extensions.", "source", bundledDuckDBExtensionsDir, "extension_directory", filepath.Join(cfg.DataDir, "extensions"), "error", err)
}

// Initialize query logger (non-fatal on error)
if ql, err := NewQueryLogger(cfg); err != nil {
slog.Warn("Failed to initialize query log, continuing without it.", "error", err)
Expand Down Expand Up @@ -873,9 +899,6 @@ func openBaseDB(cfg Config, username string) (*sql.DB, error) {
// Set extension directory under DataDir so DuckDB doesn't rely on $HOME/.duckdb
// for autoloading/installing extensions.
extDir := filepath.Join(cfg.DataDir, "extensions")
if err := seedBundledExtensions(bundledDuckDBExtensionsDir, extDir); err != nil {
slog.Warn("Failed to seed bundled DuckDB extensions.", "source", bundledDuckDBExtensionsDir, "extension_directory", extDir, "error", err)
}
if _, err := db.Exec(fmt.Sprintf("SET extension_directory = '%s'", extDir)); err != nil {
slog.Warn("Failed to set DuckDB extension_directory.", "extension_directory", extDir, "error", err)
} else {
Expand Down Expand Up @@ -964,38 +987,54 @@ func seedBundledExtensions(srcRoot, dstRoot string) error {
if info.Mode()&os.ModeSymlink != 0 {
return nil
}
if _, err := os.Stat(dstPath); err == nil {
return nil
} else if !errors.Is(err, os.ErrNotExist) {
if err := os.MkdirAll(filepath.Dir(dstPath), 0o750); err != nil {
return err
}

srcFile, err := os.Open(path)
if err != nil {
if _, err := os.Stat(dstPath); err == nil {
if !shouldRefreshBundledExtension(path) {
return nil
}
} else if !errors.Is(err, os.ErrNotExist) {
return err
}

dstFile, err := os.OpenFile(dstPath, os.O_CREATE|os.O_EXCL|os.O_WRONLY, 0o640)
if err != nil {
_ = srcFile.Close()
return err
}
if _, err := io.Copy(dstFile, srcFile); err != nil {
_ = srcFile.Close()
_ = dstFile.Close()
return err
}
if err := srcFile.Close(); err != nil {
_ = dstFile.Close()
return err
}
if err := dstFile.Close(); err != nil {
return err
}
return nil
return copyFile(path, dstPath, info.Mode().Perm())
})
}

func shouldRefreshBundledExtension(srcPath string) bool {
return filepath.Base(srcPath) == "postgres_scanner.duckdb_extension"
}

func copyFile(srcPath, dstPath string, mode os.FileMode) error {
srcFile, err := os.Open(srcPath)
if err != nil {
return err
}
defer func() { _ = srcFile.Close() }()

tmpFile, err := os.CreateTemp(filepath.Dir(dstPath), ".bundled-extension-*")
if err != nil {
return err
}
tmpPath := tmpFile.Name()
defer func() { _ = os.Remove(tmpPath) }()

if err := tmpFile.Chmod(mode); err != nil {
_ = tmpFile.Close()
return err
}
if _, err := io.Copy(tmpFile, srcFile); err != nil {
_ = tmpFile.Close()
return err
}
if err := tmpFile.Close(); err != nil {
return err
}

return os.Rename(tmpPath, dstPath)
}

// CreateDBConnection creates a DuckDB connection for a client session.
// Uses in-memory database as an anchor for DuckLake attachment (actual data lives in RDS/S3).
// This is a standalone function so it can be reused by both the server and control plane workers.
Expand Down
4 changes: 4 additions & 0 deletions server/shell.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"log/slog"
"os"
"os/signal"
"path/filepath"
"strings"
"sync"
"syscall"
Expand All @@ -18,6 +19,9 @@ import (
// DuckLake, and pg_catalog views are all available.
func RunShell(cfg Config) {
sem := make(chan struct{}, 1)
if err := bootstrapBundledExtensions(cfg.DataDir); err != nil {
slog.Warn("Failed to bootstrap bundled DuckDB extensions.", "source", bundledDuckDBExtensionsDir, "extension_directory", filepath.Join(cfg.DataDir, "extensions"), "error", err)
}

db, err := CreateDBConnection(cfg, sem, "shell", processStartTime, processVersion)
if err != nil {
Expand Down
6 changes: 5 additions & 1 deletion server/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"net"
"os"
"os/signal"
"path/filepath"
"sync"
"syscall"
"time"
Expand Down Expand Up @@ -293,6 +294,10 @@ func runChildWorker(tcpConn *net.TCPConn, cfg *ChildConfig) int {
parentVersion = cfg.ServerVersion
}

if err := bootstrapBundledExtensions(serverCfg.DataDir); err != nil {
slog.Warn("Failed to bootstrap bundled DuckDB extensions.", "source", bundledDuckDBExtensionsDir, "extension_directory", filepath.Join(serverCfg.DataDir, "extensions"), "error", err)
}

// Create DuckDB connection
db, err := CreateDBConnection(serverCfg, make(chan struct{}, 1), username, parentStartTime, parentVersion)
if err != nil {
Expand Down Expand Up @@ -382,4 +387,3 @@ func runChildWorker(tcpConn *net.TCPConn, cfg *ChildConfig) int {
return ExitSuccess
}
}

Loading
Loading