Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 55 additions & 0 deletions controlplane/provisioner/lakekeeper_k8s.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,48 @@ func (c *LakekeeperK8sClient) EnsureSecret(ctx context.Context, orgID string, da
return nil
}

// LakekeeperServiceAccountName is the per-org ServiceAccount the Lakekeeper
// Deployment + migration Job run under. It matches the CR/Secret resource
// name. The posthog-cloud-infra EKS Pod Identity association keys on this
// exact (namespace, name) pair to bind a per-org IAM role, so changing this
// convention requires a matching Terraform change.
func LakekeeperServiceAccountName(orgID string) string {
return LakekeeperResourceName(orgID)
}

// EnsureServiceAccount creates the per-org ServiceAccount that the Lakekeeper
// workload runs under. One SA per org — in a single shared namespace — so each
// org's Lakekeeper can carry a distinct cloud identity (EKS Pod Identity)
// scoped to its own object store.
//
// The SA is intentionally bare: EKS Pod Identity binds the IAM role to the
// (namespace, serviceAccount) pair on the AWS side, so no IRSA role-arn
// annotation is needed here. On re-runs we leave an existing SA untouched
// rather than overwriting it, so any annotations added out-of-band (e.g. an
// IRSA role-arn, if a cluster uses IRSA instead of Pod Identity) survive.
func (c *LakekeeperK8sClient) EnsureServiceAccount(ctx context.Context, orgID string) error {
if !isValidOrgIDLabel(orgID) {
return fmt.Errorf("EnsureServiceAccount: orgID %q is not a valid K8s label value", orgID)
}
name := LakekeeperServiceAccountName(orgID)
desired := &corev1.ServiceAccount{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Namespace: c.namespace,
Labels: map[string]string{
"app": "lakekeeper",
"duckgres/active-org": orgID,
},
},
}
sas := c.kubernetes.CoreV1().ServiceAccounts(c.namespace)
_, err := sas.Create(ctx, desired, metav1.CreateOptions{})
if err == nil || apierrors.IsAlreadyExists(err) {
return nil
}
return fmt.Errorf("create service account %s: %w", name, err)
}

// LakekeeperCRSpec carries the inputs we need to render a Lakekeeper CR.
// One CR per org. PG connection points at the org's existing managed-warehouse
// Aurora cluster, with the lakekeeper_<orgid> database created by
Expand All @@ -190,6 +232,13 @@ type LakekeeperCRSpec struct {
// local/dev where PG runs without TLS.
PGSSLMode string

// ServiceAccountName binds the Lakekeeper pod (and migration Job) to a
// specific ServiceAccount via the operator's spec.serviceAccountName
// field (a PostHog-fork addition). Empty falls back to the namespace
// default. We set it to the per-org SA so each org's Lakekeeper carries
// its own EKS Pod Identity for isolated object-store access.
ServiceAccountName string

// KubernetesAuthAudiences, when non-empty, enables the operator's
// `authentication.kubernetes` mode with the given audiences. The
// duckling's projected SA token must carry one of these audiences for
Expand Down Expand Up @@ -277,6 +326,12 @@ func (c *LakekeeperK8sClient) EnsureCR(ctx context.Context, spec LakekeeperCRSpe
},
},
}
// Bind the workload to the per-org ServiceAccount when set. Maps to the
// operator's spec.serviceAccountName (PostHog-fork field); empty leaves it
// unset so the operator falls back to the namespace default.
if spec.ServiceAccountName != "" {
cr.Object["spec"].(map[string]interface{})["serviceAccountName"] = spec.ServiceAccountName
}
// Optional: enable Kubernetes SA-token authentication. The operator
// turns this into LAKEKEEPER__K8S_AUTH_ENABLED=true +
// LAKEKEEPER__K8S_AUTH_AUDIENCES=<csv>, which makes Lakekeeper validate
Expand Down
80 changes: 80 additions & 0 deletions controlplane/provisioner/lakekeeper_k8s_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,86 @@ func assertSecretData(t *testing.T, s *corev1.Secret, want LakekeeperSecretData)
}
}

func TestEnsureServiceAccount_CreateAndIdempotent(t *testing.T) {
c, _, kc := newFakeLakekeeperClient()
ctx := context.Background()

if err := c.EnsureServiceAccount(ctx, "acme"); err != nil {
t.Fatalf("EnsureServiceAccount: %v", err)
}
sa, err := kc.CoreV1().ServiceAccounts("lakekeeper").Get(ctx, "lakekeeper-acme", metav1.GetOptions{})
if err != nil {
t.Fatalf("get SA: %v", err)
}
if sa.Labels["duckgres/active-org"] != "acme" {
t.Errorf("active-org label = %q, want acme", sa.Labels["duckgres/active-org"])
}
// Re-run must not error (AlreadyExists is swallowed).
if err := c.EnsureServiceAccount(ctx, "acme"); err != nil {
t.Fatalf("EnsureServiceAccount re-run: %v", err)
}
}

func TestEnsureServiceAccount_RejectsBadOrgID(t *testing.T) {
c, _, _ := newFakeLakekeeperClient()
if err := c.EnsureServiceAccount(context.Background(), "bad/org id"); err == nil {
t.Fatal("expected error for invalid org ID")
}
}

func TestEnsureCR_SetsServiceAccountNameWhenProvided(t *testing.T) {
c, dc, _ := newFakeLakekeeperClient()
ctx := context.Background()
base := LakekeeperCRSpec{
OrgID: "acme",
Image: "quay.io/lakekeeper/catalog:0.11.6",
PGHost: "acme-pg.local",
PGDatabase: "lakekeeper_acme",
SecretName: "lakekeeper-acme",
BaseURI: "http://lakekeeper-acme.lakekeeper.svc:8181",
}

// With SA set → rendered into spec.
withSA := base
withSA.ServiceAccountName = "lakekeeper-acme"
if err := c.EnsureCR(ctx, withSA); err != nil {
t.Fatalf("EnsureCR: %v", err)
}
got, err := dc.Resource(lakekeeperGVR).Namespace("lakekeeper").Get(ctx, "lakekeeper-acme", metav1.GetOptions{})
if err != nil {
t.Fatalf("get CR: %v", err)
}
specMap := got.Object["spec"].(map[string]interface{})
if specMap["serviceAccountName"] != "lakekeeper-acme" {
t.Errorf("spec.serviceAccountName = %v, want lakekeeper-acme", specMap["serviceAccountName"])
}
}

func TestEnsureCR_OmitsServiceAccountNameWhenEmpty(t *testing.T) {
c, dc, _ := newFakeLakekeeperClient()
ctx := context.Background()
spec := LakekeeperCRSpec{
OrgID: "acme",
Image: "quay.io/lakekeeper/catalog:0.11.6",
PGHost: "acme-pg.local",
PGDatabase: "lakekeeper_acme",
SecretName: "lakekeeper-acme",
BaseURI: "http://lakekeeper-acme.lakekeeper.svc:8181",
// ServiceAccountName intentionally empty.
}
if err := c.EnsureCR(ctx, spec); err != nil {
t.Fatalf("EnsureCR: %v", err)
}
got, err := dc.Resource(lakekeeperGVR).Namespace("lakekeeper").Get(ctx, "lakekeeper-acme", metav1.GetOptions{})
if err != nil {
t.Fatalf("get CR: %v", err)
}
specMap := got.Object["spec"].(map[string]interface{})
if _, present := specMap["serviceAccountName"]; present {
t.Errorf("spec.serviceAccountName should be omitted when empty, got %v", specMap["serviceAccountName"])
}
}

func TestEnsureCR_ValidatesRequiredFields(t *testing.T) {
c, _, _ := newFakeLakekeeperClient()
err := c.EnsureCR(context.Background(), LakekeeperCRSpec{OrgID: "acme"})
Expand Down
9 changes: 9 additions & 0 deletions controlplane/provisioner/lakekeeper_provisioner.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,14 @@ func (p *LakekeeperProvisioner) EnsureForOrg(ctx context.Context, w *configstore
return fmt.Errorf("ensure lakekeeper pg role: %w", err)
}

// 2c. Ensure the per-org ServiceAccount the Lakekeeper pod runs under.
// Must exist before the CR so the operator's Deployment + migration Job
// can mount it. Each org gets its own SA (in the shared namespace) so it
// can carry a distinct EKS Pod Identity scoped to that org's bucket.
if err := p.k8s.EnsureServiceAccount(ctx, w.OrgID); err != nil {
return fmt.Errorf("ensure lakekeeper service account: %w", err)
}

// 3. Apply the Lakekeeper CR pointing at the org's PG + the Secret.
pgPort := in.PGPort
if pgPort == 0 {
Expand All @@ -200,6 +208,7 @@ func (p *LakekeeperProvisioner) EnsureForOrg(ctx context.Context, w *configstore
SecretName: secretName,
BaseURI: baseURL,
PGSSLMode: in.PGSSLMode,
ServiceAccountName: LakekeeperServiceAccountName(w.OrgID),
KubernetesAuthAudiences: in.KubernetesAuthAudiences,
}); err != nil {
return fmt.Errorf("ensure lakekeeper cr: %w", err)
Expand Down
Loading