diff --git a/controlplane/provisioner/lakekeeper_k8s.go b/controlplane/provisioner/lakekeeper_k8s.go index f854bdae..dc833c83 100644 --- a/controlplane/provisioner/lakekeeper_k8s.go +++ b/controlplane/provisioner/lakekeeper_k8s.go @@ -168,6 +168,48 @@ func (c *LakekeeperK8sClient) EnsureSecret(ctx context.Context, orgID string, da return nil } +// LakekeeperServiceAccountName is the per-org ServiceAccount the Lakekeeper +// Deployment + migration Job run under. It matches the CR/Secret resource +// name. The posthog-cloud-infra EKS Pod Identity association keys on this +// exact (namespace, name) pair to bind a per-org IAM role, so changing this +// convention requires a matching Terraform change. +func LakekeeperServiceAccountName(orgID string) string { + return LakekeeperResourceName(orgID) +} + +// EnsureServiceAccount creates the per-org ServiceAccount that the Lakekeeper +// workload runs under. One SA per org — in a single shared namespace — so each +// org's Lakekeeper can carry a distinct cloud identity (EKS Pod Identity) +// scoped to its own object store. +// +// The SA is intentionally bare: EKS Pod Identity binds the IAM role to the +// (namespace, serviceAccount) pair on the AWS side, so no IRSA role-arn +// annotation is needed here. On re-runs we leave an existing SA untouched +// rather than overwriting it, so any annotations added out-of-band (e.g. an +// IRSA role-arn, if a cluster uses IRSA instead of Pod Identity) survive. +func (c *LakekeeperK8sClient) EnsureServiceAccount(ctx context.Context, orgID string) error { + if !isValidOrgIDLabel(orgID) { + return fmt.Errorf("EnsureServiceAccount: orgID %q is not a valid K8s label value", orgID) + } + name := LakekeeperServiceAccountName(orgID) + desired := &corev1.ServiceAccount{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: c.namespace, + Labels: map[string]string{ + "app": "lakekeeper", + "duckgres/active-org": orgID, + }, + }, + } + sas := c.kubernetes.CoreV1().ServiceAccounts(c.namespace) + _, err := sas.Create(ctx, desired, metav1.CreateOptions{}) + if err == nil || apierrors.IsAlreadyExists(err) { + return nil + } + return fmt.Errorf("create service account %s: %w", name, err) +} + // LakekeeperCRSpec carries the inputs we need to render a Lakekeeper CR. // One CR per org. PG connection points at the org's existing managed-warehouse // Aurora cluster, with the lakekeeper_ database created by @@ -190,6 +232,13 @@ type LakekeeperCRSpec struct { // local/dev where PG runs without TLS. PGSSLMode string + // ServiceAccountName binds the Lakekeeper pod (and migration Job) to a + // specific ServiceAccount via the operator's spec.serviceAccountName + // field (a PostHog-fork addition). Empty falls back to the namespace + // default. We set it to the per-org SA so each org's Lakekeeper carries + // its own EKS Pod Identity for isolated object-store access. + ServiceAccountName string + // KubernetesAuthAudiences, when non-empty, enables the operator's // `authentication.kubernetes` mode with the given audiences. The // duckling's projected SA token must carry one of these audiences for @@ -277,6 +326,12 @@ func (c *LakekeeperK8sClient) EnsureCR(ctx context.Context, spec LakekeeperCRSpe }, }, } + // Bind the workload to the per-org ServiceAccount when set. Maps to the + // operator's spec.serviceAccountName (PostHog-fork field); empty leaves it + // unset so the operator falls back to the namespace default. + if spec.ServiceAccountName != "" { + cr.Object["spec"].(map[string]interface{})["serviceAccountName"] = spec.ServiceAccountName + } // Optional: enable Kubernetes SA-token authentication. The operator // turns this into LAKEKEEPER__K8S_AUTH_ENABLED=true + // LAKEKEEPER__K8S_AUTH_AUDIENCES=, which makes Lakekeeper validate diff --git a/controlplane/provisioner/lakekeeper_k8s_test.go b/controlplane/provisioner/lakekeeper_k8s_test.go index dc0c0c63..06d2d303 100644 --- a/controlplane/provisioner/lakekeeper_k8s_test.go +++ b/controlplane/provisioner/lakekeeper_k8s_test.go @@ -95,6 +95,86 @@ func assertSecretData(t *testing.T, s *corev1.Secret, want LakekeeperSecretData) } } +func TestEnsureServiceAccount_CreateAndIdempotent(t *testing.T) { + c, _, kc := newFakeLakekeeperClient() + ctx := context.Background() + + if err := c.EnsureServiceAccount(ctx, "acme"); err != nil { + t.Fatalf("EnsureServiceAccount: %v", err) + } + sa, err := kc.CoreV1().ServiceAccounts("lakekeeper").Get(ctx, "lakekeeper-acme", metav1.GetOptions{}) + if err != nil { + t.Fatalf("get SA: %v", err) + } + if sa.Labels["duckgres/active-org"] != "acme" { + t.Errorf("active-org label = %q, want acme", sa.Labels["duckgres/active-org"]) + } + // Re-run must not error (AlreadyExists is swallowed). + if err := c.EnsureServiceAccount(ctx, "acme"); err != nil { + t.Fatalf("EnsureServiceAccount re-run: %v", err) + } +} + +func TestEnsureServiceAccount_RejectsBadOrgID(t *testing.T) { + c, _, _ := newFakeLakekeeperClient() + if err := c.EnsureServiceAccount(context.Background(), "bad/org id"); err == nil { + t.Fatal("expected error for invalid org ID") + } +} + +func TestEnsureCR_SetsServiceAccountNameWhenProvided(t *testing.T) { + c, dc, _ := newFakeLakekeeperClient() + ctx := context.Background() + base := LakekeeperCRSpec{ + OrgID: "acme", + Image: "quay.io/lakekeeper/catalog:0.11.6", + PGHost: "acme-pg.local", + PGDatabase: "lakekeeper_acme", + SecretName: "lakekeeper-acme", + BaseURI: "http://lakekeeper-acme.lakekeeper.svc:8181", + } + + // With SA set → rendered into spec. + withSA := base + withSA.ServiceAccountName = "lakekeeper-acme" + if err := c.EnsureCR(ctx, withSA); err != nil { + t.Fatalf("EnsureCR: %v", err) + } + got, err := dc.Resource(lakekeeperGVR).Namespace("lakekeeper").Get(ctx, "lakekeeper-acme", metav1.GetOptions{}) + if err != nil { + t.Fatalf("get CR: %v", err) + } + specMap := got.Object["spec"].(map[string]interface{}) + if specMap["serviceAccountName"] != "lakekeeper-acme" { + t.Errorf("spec.serviceAccountName = %v, want lakekeeper-acme", specMap["serviceAccountName"]) + } +} + +func TestEnsureCR_OmitsServiceAccountNameWhenEmpty(t *testing.T) { + c, dc, _ := newFakeLakekeeperClient() + ctx := context.Background() + spec := LakekeeperCRSpec{ + OrgID: "acme", + Image: "quay.io/lakekeeper/catalog:0.11.6", + PGHost: "acme-pg.local", + PGDatabase: "lakekeeper_acme", + SecretName: "lakekeeper-acme", + BaseURI: "http://lakekeeper-acme.lakekeeper.svc:8181", + // ServiceAccountName intentionally empty. + } + if err := c.EnsureCR(ctx, spec); err != nil { + t.Fatalf("EnsureCR: %v", err) + } + got, err := dc.Resource(lakekeeperGVR).Namespace("lakekeeper").Get(ctx, "lakekeeper-acme", metav1.GetOptions{}) + if err != nil { + t.Fatalf("get CR: %v", err) + } + specMap := got.Object["spec"].(map[string]interface{}) + if _, present := specMap["serviceAccountName"]; present { + t.Errorf("spec.serviceAccountName should be omitted when empty, got %v", specMap["serviceAccountName"]) + } +} + func TestEnsureCR_ValidatesRequiredFields(t *testing.T) { c, _, _ := newFakeLakekeeperClient() err := c.EnsureCR(context.Background(), LakekeeperCRSpec{OrgID: "acme"}) diff --git a/controlplane/provisioner/lakekeeper_provisioner.go b/controlplane/provisioner/lakekeeper_provisioner.go index 530ffa97..0c673a7a 100644 --- a/controlplane/provisioner/lakekeeper_provisioner.go +++ b/controlplane/provisioner/lakekeeper_provisioner.go @@ -185,6 +185,14 @@ func (p *LakekeeperProvisioner) EnsureForOrg(ctx context.Context, w *configstore return fmt.Errorf("ensure lakekeeper pg role: %w", err) } + // 2c. Ensure the per-org ServiceAccount the Lakekeeper pod runs under. + // Must exist before the CR so the operator's Deployment + migration Job + // can mount it. Each org gets its own SA (in the shared namespace) so it + // can carry a distinct EKS Pod Identity scoped to that org's bucket. + if err := p.k8s.EnsureServiceAccount(ctx, w.OrgID); err != nil { + return fmt.Errorf("ensure lakekeeper service account: %w", err) + } + // 3. Apply the Lakekeeper CR pointing at the org's PG + the Secret. pgPort := in.PGPort if pgPort == 0 { @@ -200,6 +208,7 @@ func (p *LakekeeperProvisioner) EnsureForOrg(ctx context.Context, w *configstore SecretName: secretName, BaseURI: baseURL, PGSSLMode: in.PGSSLMode, + ServiceAccountName: LakekeeperServiceAccountName(w.OrgID), KubernetesAuthAudiences: in.KubernetesAuthAudiences, }); err != nil { return fmt.Errorf("ensure lakekeeper cr: %w", err)