Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ jobs:
with: { go-version: stable }

- name: Ensure go.mod is tidy
run: go mod tidy && git diff --exit-code -- go.mod
run: go mod tidy --diff
- name: Ensure generated files are committed
run: make check-generate
- run: make check
Expand Down
9 changes: 4 additions & 5 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -107,16 +107,15 @@ undeploy: ## Undeploy the PostgreSQL Operator

.PHONY: deploy-dev
deploy-dev: ## Deploy the PostgreSQL Operator locally
deploy-dev: PGO_FEATURE_GATES ?= "AllAlpha=true"
deploy-dev: get-pgmonitor
deploy-dev: createnamespaces
kubectl apply --server-side -k ./config/dev
hack/create-kubeconfig.sh postgres-operator pgo
env \
QUERIES_CONFIG_DIR="${QUERIES_CONFIG_DIR}" \
CRUNCHY_DEBUG=true \
PGO_FEATURE_GATES="${PGO_FEATURE_GATES}" \
CHECK_FOR_UPGRADES='$(if $(CHECK_FOR_UPGRADES),$(CHECK_FOR_UPGRADES),false)' \
QUERIES_CONFIG_DIR='$(QUERIES_CONFIG_DIR)' \
CRUNCHY_DEBUG="$${CRUNCHY_DEBUG:-true}" \
PGO_FEATURE_GATES="$${PGO_FEATURE_GATES:-AllAlpha=true}" \
CHECK_FOR_UPGRADES="$${CHECK_FOR_UPGRADES:-false}" \
KUBECONFIG=hack/.kube/postgres-operator/pgo \
PGO_NAMESPACE='postgres-operator' \
PGO_INSTALLER='deploy-dev' \
Expand Down
108 changes: 62 additions & 46 deletions cmd/postgres-operator/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (

"k8s.io/apimachinery/pkg/util/validation"
"k8s.io/client-go/rest"
"k8s.io/klog/v2"
"sigs.k8s.io/controller-runtime/pkg/healthz"
"sigs.k8s.io/controller-runtime/pkg/metrics/filters"

Expand All @@ -40,31 +41,66 @@ import (
"github.com/crunchydata/postgres-operator/pkg/apis/postgres-operator.crunchydata.com/v1beta1"
)

// assertNoError panics when err is not nil.
func assertNoError(err error) {
// must panics when err is not nil.
func must(err error) { need(0, err) }
func need[V any](v V, err error) V {
if err != nil {
panic(err)
}
return v
}

func initClient() (*rest.Config, error) {
config, err := runtime.GetConfig()

if err == nil && userAgent == "" {
err = errors.New("call initVersion first")
}
if err == nil {
config.UserAgent = userAgent
config.Wrap(otelTransportWrapper())

// Log Kubernetes API warnings encountered by client-go at a high verbosity.
// See [rest.WarningLogger].
handler := runtime.WarningHandler(func(ctx context.Context, code int, _ string, message string) {
if code == 299 && len(message) != 0 {
logging.FromContext(ctx).V(2).WithName("client-go").Info(message)
}
})
config.WarningHandler = handler
config.WarningHandlerWithContext = handler
}

return config, err
}

func initLogging() {
// Configure a singleton that treats logging.Logger.V(1) as logrus.DebugLevel.
debug := strings.TrimSpace(os.Getenv("CRUNCHY_DEBUG"))

var verbosity int
if strings.EqualFold(os.Getenv("CRUNCHY_DEBUG"), "true") {
if strings.EqualFold(debug, "true") {
verbosity = 1
} else if i, err := strconv.Atoi(debug); err == nil && i > 0 {
verbosity = i
}

// Configure a singleton that treats logging.Logger.V(1) as logrus.DebugLevel.
logging.SetLogSink(logging.Logrus(os.Stdout, versionString, 1, verbosity))

global := logging.FromContext(context.Background())
runtime.SetLogger(global)

// [k8s.io/client-go/tools/leaderelection] logs to the global [klog] instance.
// - https://github.com/kubernetes-sigs/controller-runtime/issues/2656
klog.SetLoggerWithOptions(global, klog.ContextualLogger(true))
}

//+kubebuilder:rbac:groups="coordination.k8s.io",resources="leases",verbs={get,create,update,watch}
//+kubebuilder:rbac:groups="authentication.k8s.io",resources="tokenreviews",verbs={create}
//+kubebuilder:rbac:groups="authorization.k8s.io",resources="subjectaccessreviews",verbs={create}

func initManager(ctx context.Context) (runtime.Options, error) {
log := logging.FromContext(ctx)
log := logging.FromContext(ctx).WithName("manager")

options := runtime.Options{}
options.Cache.SyncPeriod = initialize.Pointer(time.Hour)
Expand Down Expand Up @@ -173,7 +209,7 @@ func main() {
}

features := feature.NewGate()
assertNoError(features.Set(os.Getenv("PGO_FEATURE_GATES")))
must(features.Set(os.Getenv("PGO_FEATURE_GATES")))

running = feature.NewContext(running, features)
log.Info("feature gates",
Expand All @@ -183,32 +219,18 @@ func main() {
"enabled", feature.ShowEnabled(running))

// Initialize OpenTelemetry and flush data when there is a panic.
otelFinish, err := initOpenTelemetry(running)
assertNoError(err)
otelFinish := need(initOpenTelemetry(running))
defer func(ctx context.Context) { _ = otelFinish(ctx) }(running)

tracing.SetDefaultTracer(tracing.New("github.com/CrunchyData/postgres-operator"))

cfg, err := runtime.GetConfig()
assertNoError(err)

cfg.UserAgent = userAgent
cfg.Wrap(otelTransportWrapper())

// TODO(controller-runtime): Set config.WarningHandler instead after v0.19.0.
// Configure client-go to suppress warnings when warning headers are encountered. This prevents
// warnings from being logged over and over again during reconciliation (e.g. this will suppress
// deprecation warnings when using an older version of a resource for backwards compatibility).
rest.SetDefaultWarningHandler(rest.NoWarnings{})

k8s, err := kubernetes.NewDiscoveryRunner(cfg)
assertNoError(err)
assertNoError(k8s.Read(running))

// Load Kubernetes client configuration and ensure it works.
config := need(initClient())
k8s := need(kubernetes.NewDiscoveryRunner(config))
must(k8s.Read(running))
log.Info("connected to Kubernetes", "api", k8s.Version().String(), "openshift", k8s.IsOpenShift())

options, err := initManager(running)
assertNoError(err)
options := need(initManager(running))

// Add to the Context that Manager passes to Reconciler.Start, Runnable.Start,
// and eventually Reconciler.Reconcile.
Expand All @@ -219,51 +241,44 @@ func main() {
return ctx
}

mgr, err := runtime.NewManager(cfg, options)
assertNoError(err)
assertNoError(mgr.Add(k8s))
manager := need(runtime.NewManager(config, options))
must(manager.Add(k8s))

registrar, err := registration.NewRunner(os.Getenv("RSA_KEY"), os.Getenv("TOKEN_PATH"), stopRunning)
assertNoError(err)
assertNoError(mgr.Add(registrar))
registrar := need(registration.NewRunner(os.Getenv("RSA_KEY"), os.Getenv("TOKEN_PATH"), stopRunning))
must(manager.Add(registrar))
token, _ := registrar.CheckToken()

// add all PostgreSQL Operator controllers to the runtime manager
addControllersToManager(mgr, log, registrar)
addControllersToManager(manager, log, registrar)

if features.Enabled(feature.BridgeIdentifiers) {
url := os.Getenv("PGO_BRIDGE_URL")
constructor := func() *bridge.Client {
client := bridge.NewClient(os.Getenv("PGO_BRIDGE_URL"), versionString)
client := bridge.NewClient(url, versionString)
client.Transport = otelTransportWrapper()(http.DefaultTransport)
return client
}

assertNoError(bridge.ManagedInstallationReconciler(mgr, constructor))
must(bridge.ManagedInstallationReconciler(manager, constructor))
}

// Enable upgrade checking
upgradeCheckingDisabled := strings.EqualFold(os.Getenv("CHECK_FOR_UPGRADES"), "false")
if !upgradeCheckingDisabled {
log.Info("upgrade checking enabled")
// get the URL for the check for upgrades endpoint if set in the env
assertNoError(
upgradecheck.ManagedScheduler(
mgr,
os.Getenv("CHECK_FOR_UPGRADES_URL"),
versionString,
token,
))
url := os.Getenv("CHECK_FOR_UPGRADES_URL")
must(upgradecheck.ManagedScheduler(manager, url, versionString, token))
} else {
log.Info("upgrade checking disabled")
}

// Enable health probes
assertNoError(mgr.AddHealthzCheck("health", healthz.Ping))
assertNoError(mgr.AddReadyzCheck("check", healthz.Ping))
must(manager.AddHealthzCheck("health", healthz.Ping))
must(manager.AddReadyzCheck("check", healthz.Ping))

// Start the manager and wait for its context to be canceled.
stopped := make(chan error, 1)
go func() { stopped <- mgr.Start(running) }()
go func() { stopped <- manager.Start(running) }()
<-running.Done()

// Set a deadline for graceful termination.
Expand All @@ -272,6 +287,7 @@ func main() {
defer cancel()

// Wait for the manager to return or the deadline to pass.
var err error
select {
case err = <-stopped:
case <-stopping.Done():
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ require (
k8s.io/apimachinery v0.33.4
k8s.io/client-go v0.33.4
k8s.io/component-base v0.33.4
k8s.io/klog/v2 v2.130.1
k8s.io/kube-openapi v0.0.0-20250318190949-c8a335a9a2ff
sigs.k8s.io/controller-runtime v0.21.0
sigs.k8s.io/json v0.0.0-20241010143419-9aa6b5e7a4b3
Expand Down Expand Up @@ -127,7 +128,6 @@ require (
gopkg.in/yaml.v3 v3.0.1 // indirect
k8s.io/apiextensions-apiserver v0.33.0 // indirect
k8s.io/apiserver v0.33.0 // indirect
k8s.io/klog/v2 v2.130.1 // indirect
k8s.io/utils v0.0.0-20241104100929-3ea5e8cea738 // indirect
sigs.k8s.io/apiserver-network-proxy/konnectivity-client v0.31.2 // indirect
sigs.k8s.io/controller-tools v0.17.3 // indirect
Expand Down
13 changes: 13 additions & 0 deletions internal/controller/runtime/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ package runtime
import (
"context"

"k8s.io/client-go/rest"
"sigs.k8s.io/controller-runtime/pkg/client"
)

Expand Down Expand Up @@ -74,3 +75,15 @@ func (fn ClientPatch) Patch(ctx context.Context, obj client.Object, patch client
func (fn ClientUpdate) Update(ctx context.Context, obj client.Object, opts ...client.UpdateOption) error {
return fn(ctx, obj, opts...)
}

type WarningHandler func(ctx context.Context, code int, agent string, text string)

func (fn WarningHandler) HandleWarningHeader(code int, agent string, text string) {
fn(context.Background(), code, agent, text)
}
func (fn WarningHandler) HandleWarningHeaderWithContext(ctx context.Context, code int, agent string, text string) {
fn(ctx, code, agent, text)
}

var _ rest.WarningHandler = WarningHandler(nil)
var _ rest.WarningHandlerWithContext = WarningHandler(nil)
Loading