Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions appliance/postgresql/cmd/flynn-postgres-api/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,14 @@ import (
"os"
"strings"

"context"
"github.com/flynn/flynn/discoverd/client"
"github.com/flynn/flynn/pkg/httphelper"
"github.com/flynn/flynn/pkg/postgres"
"github.com/flynn/flynn/pkg/random"
"github.com/flynn/flynn/pkg/resource"
"github.com/flynn/flynn/pkg/shutdown"
"github.com/julienschmidt/httprouter"
"context"
)

const (
Expand Down Expand Up @@ -78,7 +78,7 @@ func (p *pgAPI) createDatabase(ctx context.Context, w http.ResponseWriter, req *
httphelper.Error(w, err)
return
}
if err := p.db.Exec(fmt.Sprintf(`CREATE DATABASE "%s"`, database)); err != nil {
if err := p.db.Exec(fmt.Sprintf(`CREATE DATABASE "%s" OWNER "%s"`, database, username)); err != nil {
p.db.Exec(fmt.Sprintf(`DROP USER "%s"`, username))
httphelper.Error(w, err)
return
Expand Down
76 changes: 44 additions & 32 deletions appliance/postgresql/process.go
Original file line number Diff line number Diff line change
Expand Up @@ -347,8 +347,9 @@ func (p *Process) assumePrimary(downstream *discoverd.Instance) (err error) {
if p.running() && p.config().Role == state.RoleSync {
log.Info("promoting to primary")

if err := os.WriteFile(p.triggerPath(), nil, 0655); err != nil {
log.Error("error creating trigger file", "path", p.triggerPath(), "err", err)
// PG 16 removed promote_trigger_file; use pg_ctl promote instead
if err := p.runCmd(exec.Command(p.binPath("pg_ctl"), "promote", "-D", p.dataDir)); err != nil {
log.Error("error promoting standby", "err", err)
return err
}

Expand All @@ -369,6 +370,10 @@ func (p *Process) assumePrimary(downstream *discoverd.Instance) (err error) {

if err := os.Remove(p.recoveryConfPath()); err != nil && !os.IsNotExist(err) {
log.Error("error removing recovery.conf", "path", p.recoveryConfPath(), "err", err)
// non-fatal, PG 16 doesn't use recovery.conf
}
if err := os.Remove(p.standbySignalPath()); err != nil && !os.IsNotExist(err) {
log.Error("error removing standby.signal", "path", p.standbySignalPath(), "err", err)
return err
}

Expand Down Expand Up @@ -483,6 +488,14 @@ func (p *Process) installExtensionsInTemplate() error {
return fmt.Errorf("creating extension %s in template1: %s", ext, err)
}
}

// In PostgreSQL 15+, the default CREATE privilege on the public schema was
// revoked for non-owner roles. Restore the PG14 behavior so that application
// users can create tables in the public schema of their databases.
if _, err := templateDB.Exec("GRANT ALL ON SCHEMA public TO PUBLIC"); err != nil {
return fmt.Errorf("granting public schema privileges in template1: %s", err)
}

return nil
}

Expand Down Expand Up @@ -536,12 +549,8 @@ func (p *Process) assumeStandby(upstream, downstream *discoverd.Instance) error
os.Remove(p.triggerPath())
}

if err := p.writeConfig(configData{ReadOnly: true}); err != nil {
log.Error("error writing postgres.conf", "path", p.configPath(), "err", err)
return err
}
if err := p.writeRecoveryConf(upstream); err != nil {
log.Error("error writing recovery.conf", "path", p.recoveryConfPath(), "err", err)
if err := p.writeRecoveryConf(upstream, configData{ReadOnly: true}); err != nil {
log.Error("error writing recovery config", "path", p.configPath(), "err", err)
return err
}

Expand Down Expand Up @@ -897,21 +906,21 @@ func (p *Process) writeConfig(d configData) error {
return configTemplate.Execute(f, d)
}

func (p *Process) writeRecoveryConf(upstream *discoverd.Instance) error {
data := recoveryData{
TriggerFile: p.triggerPath(),
PrimaryInfo: fmt.Sprintf(
"host=%s port=%s user=flynn password=%s application_name=%s",
upstream.Host(), upstream.Port(), p.password, p.id,
),
}
func (p *Process) recoveryConfigData(upstream *discoverd.Instance, d configData) configData {
d.PrimaryConnInfo = fmt.Sprintf(
"host=%s port=%s user=flynn password=%s application_name=%s",
upstream.Host(), upstream.Port(), p.password, p.id,
)
return d
}

f, err := os.Create(p.recoveryConfPath())
if err != nil {
func (p *Process) writeRecoveryConf(upstream *discoverd.Instance, d configData) error {
// PG 12+ uses postgresql.conf for recovery settings + standby.signal file
if err := p.writeConfig(p.recoveryConfigData(upstream, d)); err != nil {
return err
}
defer f.Close()
return recoveryConfTemplate.Execute(f, data)
// Create standby.signal to indicate this is a standby
return os.WriteFile(p.standbySignalPath(), nil, 0644)
}

func (p *Process) writeHBAConf() error {
Expand All @@ -923,9 +932,14 @@ func (p *Process) configPath() string {
}

func (p *Process) recoveryConfPath() string {
// PG 16 no longer uses recovery.conf; keep this for cleanup of old files
return p.dataPath("recovery.conf")
}

func (p *Process) standbySignalPath() string {
return p.dataPath("standby.signal")
}

func (p *Process) hbaConfPath() string {
return p.dataPath("pg_hba.conf")
}
Expand All @@ -951,6 +965,9 @@ type configData struct {
TimescaleDB bool
ExtWhitelist bool
SHMType string

// Recovery settings (PG 12+ uses postgresql.conf instead of recovery.conf)
PrimaryConnInfo string
}

var configTemplate = template.Must(template.New("postgresql.conf").Parse(`
Expand All @@ -960,10 +977,10 @@ port = {{.Port}}
ssl = off
max_connections = 400
shared_buffers = 32MB
wal_level = hot_standby
wal_level = replica
fsync = on
max_wal_senders = 15
wal_keep_segments = 128
wal_keep_size = 2048
synchronous_commit = remote_write
synchronous_standby_names = '{{.Sync}}'
{{if .ReadOnly}}
Expand All @@ -985,6 +1002,7 @@ datestyle = 'iso, mdy'
timezone = 'UTC'
client_encoding = 'UTF8'
default_text_search_config = 'pg_catalog.english'
password_encryption = md5

{{if .TimescaleDB}}
shared_preload_libraries = 'timescaledb'
Expand All @@ -998,18 +1016,12 @@ dynamic_shared_memory_type = '{{.SHMType}}'
local_preload_libraries = 'pgextwlist'
extwlist.extensions = 'btree_gin,btree_gist,chkpass,citext,cube,dblink,dict_int,earthdistance,fuzzystrmatch,hstore,intarray,isn,ltree,pg_prewarm,pg_stat_statements,pg_trgm,pgcrypto,pgrouting,pgrowlocks,pgstattuple,plpgsql,plv8,postgis,postgis_topology,postgres_fdw,tablefunc,timescaledb,unaccent,uuid-ossp'
{{end}}
`[1:]))

type recoveryData struct {
PrimaryInfo string
TriggerFile string
}

var recoveryConfTemplate = template.Must(template.New("recovery.conf").Parse(`
standby_mode = on
primary_conninfo = '{{.PrimaryInfo}}'
trigger_file = '{{.TriggerFile}}'
{{if .PrimaryConnInfo}}
# Recovery settings (managed by flynn-postgres, PG 12+)
primary_conninfo = '{{.PrimaryConnInfo}}'
recovery_target_timeline = 'latest'
{{end}}
`[1:]))

var hbaConf = []byte(`
Expand Down
8 changes: 4 additions & 4 deletions controller/data/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ $$ LANGUAGE plpgsql`,
created_at timestamptz NOT NULL DEFAULT now(),
updated_at timestamptz NOT NULL DEFAULT now()
)`,
`CREATE FUNCTION check_job_state() RETURNS OPAQUE AS $$
`CREATE FUNCTION check_job_state() RETURNS trigger AS $$
BEGIN
IF NEW.state < OLD.state THEN
RAISE EXCEPTION 'invalid job state transition: % -> %', OLD.state, NEW.state USING ERRCODE = 'check_violation';
Expand Down Expand Up @@ -309,7 +309,7 @@ $$ LANGUAGE plpgsql`,

// add a check to ensure releases only have a single "docker"
// artifact, and that artifact is added first
`CREATE FUNCTION check_release_artifacts() RETURNS OPAQUE AS $$
`CREATE FUNCTION check_release_artifacts() RETURNS trigger AS $$
BEGIN
IF (
SELECT COUNT(*)
Expand Down Expand Up @@ -359,7 +359,7 @@ $$ LANGUAGE plpgsql`,
`INSERT INTO event_types (name) VALUES ('release_deletion')`,

// add a trigger to prevent current app releases from being deleted
`CREATE FUNCTION check_release_delete() RETURNS OPAQUE AS $$
`CREATE FUNCTION check_release_delete() RETURNS trigger AS $$
BEGIN
IF NEW.deleted_at IS NOT NULL AND (SELECT COUNT(*) FROM apps WHERE release_id = NEW.release_id) != 0 THEN
RAISE EXCEPTION 'cannot delete current app release' USING ERRCODE = 'check_violation';
Expand Down Expand Up @@ -432,7 +432,7 @@ $$ LANGUAGE plpgsql`,
`ALTER TABLE artifacts ADD COLUMN hashes jsonb`,
`ALTER TABLE artifacts ADD COLUMN size integer`,
`ALTER TABLE artifacts ADD COLUMN layer_url_template text`,
`CREATE FUNCTION check_artifact_manifest() RETURNS OPAQUE AS $$
`CREATE FUNCTION check_artifact_manifest() RETURNS trigger AS $$
BEGIN
IF NEW.type = 'flynn' AND NEW.manifest IS NULL THEN
RAISE EXCEPTION 'flynn artifacts must have a manifest' USING ERRCODE = 'check_violation';
Expand Down
2 changes: 1 addition & 1 deletion controller/schema/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ func Load(schemaRoot string) error {

var schemaPaths []string
walkFn := func(path string, info os.FileInfo, err error) error {
if !info.IsDir() && filepath.Ext(path) == ".json" {
if !info.IsDir() && filepath.Ext(path) == ".json" && !strings.HasPrefix(filepath.Base(path), "._") {
schemaPaths = append(schemaPaths, path)
}
return nil
Expand Down
20 changes: 18 additions & 2 deletions flannel/backend/vxlan/device.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ type vxlanDeviceAttrs struct {
}

type vxlanDevice struct {
link *netlink.Vxlan
link *netlink.Vxlan
desiredMAC net.HardwareAddr
}

func newVXLANDevice(devAttrs *vxlanDeviceAttrs) (*vxlanDevice, error) {
Expand Down Expand Up @@ -79,7 +80,8 @@ func newVXLANDevice(devAttrs *vxlanDeviceAttrs) (*vxlanDevice, error) {
}

return &vxlanDevice{
link: link,
link: link,
desiredMAC: link.HardwareAddr,
}, nil
}

Expand Down Expand Up @@ -134,6 +136,20 @@ func (dev *vxlanDevice) Configure(ipn ip.IP4Net) error {
return fmt.Errorf("failed to set interface %s to UP state: %s", dev.link.Attrs().Name, err)
}

// Re-apply the desired MAC address after LinkSetUp, because some kernels
// regenerate the VXLAN MAC when the interface transitions to UP state.
if dev.desiredMAC != nil {
current, _ := netlink.LinkByIndex(dev.link.Index)
if current != nil && !macEqual(current.Attrs().HardwareAddr, dev.desiredMAC) {
log.Infof("MAC changed after LinkSetUp (got %s, want %s), re-applying", current.Attrs().HardwareAddr, dev.desiredMAC)
if err := netlink.LinkSetHardwareAddr(dev.link, dev.desiredMAC); err != nil {
log.Warningf("failed to re-apply MAC on %s: %v", dev.link.Name, err)
} else {
dev.link.HardwareAddr = dev.desiredMAC
}
}
}

// explicitly add a route since there might be a route for a subnet already
// installed by Docker and then it won't get auto added
route := netlink.Route{
Expand Down
16 changes: 16 additions & 0 deletions host/libcontainer_backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import (
"github.com/flynn/flynn/pkg/shutdown"
"github.com/flynn/flynn/pkg/syslog/rfc5424"
"github.com/flynn/flynn/pkg/term"
"github.com/flynn/flynn/pkg/tufconfig"
"github.com/flynn/flynn/pkg/tufutil"
"github.com/flynn/flynn/pkg/verify"
tuf "github.com/flynn/go-tuf/client"
Expand Down Expand Up @@ -117,6 +118,21 @@ func NewLibcontainerBackend(config *LibcontainerConfig) (Backend, error) {
if err != nil {
return nil, fmt.Errorf("error initializing TUF client: %s", err)
}
// Update the local TUF metadata from the remote repository
// so that target lookups in Download() can find the targets.
// Initialize root keys if this is a fresh local store.
if _, err := tufClient.Update(); err != nil {
if err == tuf.ErrNoRootKeys {
if err := tufClient.Init(tufconfig.RootKeys, 1); err != nil {
return nil, fmt.Errorf("error initializing TUF root keys: %s", err)
}
if _, err := tufClient.Update(); err != nil && !tuf.IsLatestSnapshot(err) {
return nil, fmt.Errorf("error updating TUF metadata after init: %s", err)
}
} else if !tuf.IsLatestSnapshot(err) {
return nil, fmt.Errorf("error updating TUF metadata: %s", err)
}
}
l.tufClient = tufClient
}
l.httpClient = &http.Client{Transport: &http.Transport{
Expand Down
6 changes: 4 additions & 2 deletions pkg/postgres/migrate.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ import (
"strconv"
"time"

"github.com/jackc/pgx"
"github.com/inconshreveable/log15"
"github.com/jackc/pgx"
)

type Step func(*DBTx) error
Expand Down Expand Up @@ -39,7 +39,9 @@ func (m Migrations) Migrate(db *DB) error {
var initialized bool
for _, migration := range m {
if !initialized {
db.Exec("CREATE TABLE IF NOT EXISTS schema_migrations (id bigint PRIMARY KEY)")
if err := db.Exec("CREATE TABLE IF NOT EXISTS schema_migrations (id bigint PRIMARY KEY)"); err != nil {
return err
}
initialized = true
}

Expand Down
Loading
Loading