Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 19 additions & 2 deletions internal/activityprocessor/policycache.go
Original file line number Diff line number Diff line change
Expand Up @@ -464,11 +464,27 @@ func (r *CompiledRule) EvaluateEventMatch(eventMap map[string]any) (bool, error)
// EvaluateCompiledAuditRules evaluates pre-compiled audit rules against an audit event.
// Returns the generated Activity, the matching rule index, and any error.
// Returns (nil, -1, nil) if no rule matched.
//
// Use EvaluateCompiledAuditRulesWithResolver to also enrich the resulting
// activity with user display names; this convenience wrapper passes nil.
func EvaluateCompiledAuditRules(
policy *CompiledPolicy,
auditMap map[string]any,
audit *auditv1.Event,
resolveKind processor.KindResolver,
) (*v1alpha1.Activity, int, error) {
return EvaluateCompiledAuditRulesWithResolver(policy, auditMap, audit, resolveKind, nil)
}

// EvaluateCompiledAuditRulesWithResolver is like EvaluateCompiledAuditRules
// but enriches the activity actor and any User-typed link targets with
// display names looked up via resolver.
func EvaluateCompiledAuditRulesWithResolver(
policy *CompiledPolicy,
auditMap map[string]any,
audit *auditv1.Event,
resolveKind processor.KindResolver,
resolver processor.UserResolver,
) (*v1alpha1.Activity, int, error) {
for i := range policy.AuditRules {
rule := &policy.AuditRules[i]
Expand All @@ -491,8 +507,9 @@ func EvaluateCompiledAuditRules(
}

builder := &processor.ActivityBuilder{
APIGroup: policy.APIGroup,
Kind: policy.Kind,
APIGroup: policy.APIGroup,
Kind: policy.Kind,
UserResolver: resolver,
}
activity, err := builder.BuildFromAudit(audit, summary, links, resolveKind)
if err != nil {
Expand Down
10 changes: 9 additions & 1 deletion internal/activityprocessor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,10 @@ type Processor struct {
// dlqRetryController handles automatic retry of DLQ events.
dlqRetryController *DLQRetryController

// userResolver enriches activities with iam User display names. nil
// disables enrichment; activities are emitted with raw emails/IDs.
userResolver processor.UserResolver

wg sync.WaitGroup
ctx context.Context
cancel context.CancelFunc
Expand Down Expand Up @@ -393,6 +397,10 @@ func (p *Processor) Start(ctx context.Context) error {
// Create event emitter for health reporting
p.eventEmitter = NewEventEmitter(k8sClient, recorder)

// Wire a cached iam User resolver so activities are enriched with
// human-readable display names. Failures fall back to email/UID.
p.userResolver = processor.NewCachedUserResolver(NewIAMUserResolver(k8sClient), 0, 0)

// Build NATS connection options
natsOpts := []nats.Option{
nats.Name("activity-processor"),
Expand Down Expand Up @@ -1034,7 +1042,7 @@ func (p *Processor) processMessage(msg *nats.Msg) error {

// evaluateCompiledAuditRules evaluates audit rules using pre-compiled CEL programs.
func (p *Processor) evaluateCompiledAuditRules(policy *CompiledPolicy, auditMap map[string]any, audit *auditv1.Event) (*v1alpha1.Activity, int, error) {
return EvaluateCompiledAuditRules(policy, auditMap, audit, p.resourceToKind)
return EvaluateCompiledAuditRulesWithResolver(policy, auditMap, audit, p.resourceToKind, p.userResolver)
}

// auditToMap converts an audit event to a map for CEL evaluation.
Expand Down
101 changes: 101 additions & 0 deletions internal/activityprocessor/userresolver.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
package activityprocessor

import (
"context"
"fmt"

apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime/schema"
"sigs.k8s.io/controller-runtime/pkg/client"

"go.miloapis.com/activity/internal/processor"
)

// userGVK identifies the iam User custom resource we resolve display names
// from. The activity processor never serves these objects, so we query them
// as Unstructured to avoid pulling in the milo iam Go types.
var userGVK = schema.GroupVersionKind{
Group: "iam.miloapis.com",
Version: "v1alpha1",
Kind: "User",
}

// IAMUserResolver implements processor.UserResolver against an iam User CR
// store reached through a controller-runtime client. It is safe for
// concurrent use; wrap with processor.NewCachedUserResolver to add caching
// and per-key single-flight.
type IAMUserResolver struct {
Client client.Client
}

// NewIAMUserResolver returns a resolver that fetches iam Users via c.
func NewIAMUserResolver(c client.Client) *IAMUserResolver {
return &IAMUserResolver{Client: c}
}

// LookupByEmail finds the first iam User whose spec.email matches the given
// address. Returns ok=false when no match is found or email is empty.
func (r *IAMUserResolver) LookupByEmail(ctx context.Context, email string) (processor.UserInfo, bool, error) {
if email == "" || r == nil || r.Client == nil {
return processor.UserInfo{}, false, nil
}

list := &unstructured.UnstructuredList{}
list.SetGroupVersionKind(schema.GroupVersionKind{
Group: userGVK.Group,
Version: userGVK.Version,
Kind: userGVK.Kind + "List",
})

// Most clusters do not index spec.email server-side; list all and filter
// in process. The cached wrapper amortizes this across calls; if scale
// becomes a concern, register a field indexer for spec.email in the
// manager's cache.
if err := r.Client.List(ctx, list); err != nil {
return processor.UserInfo{}, false, fmt.Errorf("list iam users: %w", err)
}

for i := range list.Items {
item := &list.Items[i]
got, _, _ := unstructured.NestedString(item.Object, "spec", "email")
if got == email {
return userInfoFromUnstructured(item), true, nil
}
}

return processor.UserInfo{}, false, nil
}

// LookupByName fetches an iam User by metadata.name and returns its display
// fields. Returns ok=false on NotFound.
func (r *IAMUserResolver) LookupByName(ctx context.Context, name string) (processor.UserInfo, bool, error) {
if name == "" || r == nil || r.Client == nil {
return processor.UserInfo{}, false, nil
}

obj := &unstructured.Unstructured{}
obj.SetGroupVersionKind(userGVK)

if err := r.Client.Get(ctx, client.ObjectKey{Name: name}, obj); err != nil {
if apierrors.IsNotFound(err) {
return processor.UserInfo{}, false, nil
}
return processor.UserInfo{}, false, fmt.Errorf("get iam user %q: %w", name, err)
}

return userInfoFromUnstructured(obj), true, nil
}

func userInfoFromUnstructured(obj *unstructured.Unstructured) processor.UserInfo {
given, _, _ := unstructured.NestedString(obj.Object, "spec", "givenName")
family, _, _ := unstructured.NestedString(obj.Object, "spec", "familyName")
email, _, _ := unstructured.NestedString(obj.Object, "spec", "email")
return processor.UserInfo{
Name: obj.GetName(),
GivenName: given,
FamilyName: family,
Email: email,
UID: string(obj.GetUID()),
}
}
116 changes: 115 additions & 1 deletion internal/processor/activity.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
package processor

import (
"context"
"crypto/sha256"
"encoding/hex"
"encoding/json"
"fmt"
"strings"
"time"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand All @@ -15,6 +17,10 @@ import (
"go.miloapis.com/activity/pkg/apis/activity/v1alpha1"
)

// iamGroup is the API group for Milo IAM resources we enrich with user
// display names.
const iamGroup = "iam.miloapis.com"

// activityName generates a deterministic activity name from the origin event
// identifier and the policy's resource target. The same input always produces
// the same name, enabling NATS message deduplication on retries.
Expand All @@ -35,6 +41,10 @@ type ActivityBuilder struct {
// Resource information from the policy
APIGroup string
Kind string

// UserResolver is consulted (when non-nil) to enrich the actor and any
// User-typed link targets with human-readable display names.
UserResolver UserResolver
}

// BuildFromAudit constructs an Activity from an audit event.
Expand Down Expand Up @@ -64,8 +74,9 @@ func (b *ActivityBuilder) BuildFromAudit(
resourceUID := extractResponseUID(audit.ResponseObject)

// Classify change source and resolve actor
ctx := context.Background()
changeSource := ClassifyChangeSource(audit.User)
actor := ResolveActor(audit.User)
actor := ResolveActorWithResolver(ctx, audit.User, b.UserResolver)
tenant := ExtractTenant(audit.User)

// Generate activity name
Expand All @@ -77,6 +88,10 @@ func (b *ActivityBuilder) BuildFromAudit(
return nil, fmt.Errorf("%w: %v", ErrActivityBuild, err)
}

// Enrich: replace actor email with display name in summary, attach actor
// link, and hydrate any User-typed link targets with display names.
summary, activityLinks = enrichSummaryWithDisplayNames(ctx, summary, actor, activityLinks, b.UserResolver)

return &v1alpha1.Activity{
TypeMeta: metav1.TypeMeta{
APIVersion: v1alpha1.SchemeGroupVersion.String(),
Expand Down Expand Up @@ -115,6 +130,101 @@ func (b *ActivityBuilder) BuildFromAudit(
}, nil
}

// enrichSummaryWithDisplayNames rewrites the summary to use human-readable
// display names for the actor and any User-typed link targets, and appends
// link metadata so the UI can render an email/UID tooltip.
//
// Behavior:
// - When the actor has a DisplayName, the first occurrence of the actor's
// Name (typically an email) in the summary is replaced with the
// DisplayName, and a synthetic actor link is appended carrying the
// DisplayName, Email, and UID.
// - For each existing link whose resource is an iam User, the resolver is
// queried by the link's resource name; on hit, the link's Marker is
// replaced in the summary with the user's DisplayName and the link's
// DisplayName/Email fields are populated.
//
// Returns the rewritten summary and links. When resolver is nil or no
// matches occur, the inputs are returned unchanged.
func enrichSummaryWithDisplayNames(
ctx context.Context,
summary string,
actor v1alpha1.ActivityActor,
links []v1alpha1.ActivityLink,
resolver UserResolver,
) (string, []v1alpha1.ActivityLink) {
// Actor: if we have a display name distinct from the name, swap it into
// the summary. If the policy template already wrapped the actor with
// link() (so a link entry exists with marker == actor.Name), upgrade
// that entry in place; otherwise append a synthetic actor link so the
// UI can render the hover tooltip.
if actor.DisplayName != "" && actor.DisplayName != actor.Name && actor.Name != "" {
summaryHadActor := strings.Contains(summary, actor.Name)
if summaryHadActor {
summary = strings.Replace(summary, actor.Name, actor.DisplayName, 1)
}

upgraded := false
for i := range links {
if links[i].Marker == actor.Name {
links[i].Marker = actor.DisplayName
links[i].DisplayName = actor.DisplayName
if links[i].Email == "" {
links[i].Email = actor.Email
}
upgraded = true
break
}
}
if !upgraded && summaryHadActor {
links = append(links, v1alpha1.ActivityLink{
Marker: actor.DisplayName,
Resource: v1alpha1.ActivityResource{
APIGroup: iamGroup,
Kind: "User",
UID: actor.UID,
},
DisplayName: actor.DisplayName,
Email: actor.Email,
})
}
}

// User-typed link targets: hydrate via resolver and rewrite the summary.
if resolver != nil {
for i := range links {
link := &links[i]
if !isUserLink(link.Resource) {
continue
}
if link.Resource.Name == "" || link.DisplayName != "" {
continue
}
info, ok, err := resolver.LookupByName(ctx, link.Resource.Name)
if err != nil || !ok {
continue
}
displayName := info.DisplayName()
if displayName == "" {
continue
}
if link.Marker != "" && link.Marker != displayName {
summary = strings.Replace(summary, link.Marker, displayName, 1)
link.Marker = displayName
}
link.DisplayName = displayName
link.Email = info.Email
}
}

return summary, links
}

// isUserLink reports whether the resource targets an iam User CR.
func isUserLink(r v1alpha1.ActivityResource) bool {
return r.APIGroup == iamGroup && r.Kind == "User"
}

// extractResponseUID extracts the UID from an audit response object's metadata.
func extractResponseUID(responseObject *runtime.Unknown) string {
if responseObject == nil || len(responseObject.Raw) == 0 {
Expand Down Expand Up @@ -202,6 +312,10 @@ func (b *ActivityBuilder) BuildFromEvent(
return nil, fmt.Errorf("%w: %v", ErrActivityBuild, err)
}

// Hydrate User-typed links with display names (event actors are system
// components, so no actor enrichment is needed).
summary, activityLinks = enrichSummaryWithDisplayNames(context.Background(), summary, actor, activityLinks, b.UserResolver)

return &v1alpha1.Activity{
TypeMeta: metav1.TypeMeta{
APIVersion: v1alpha1.SchemeGroupVersion.String(),
Expand Down
18 changes: 18 additions & 0 deletions internal/processor/classifier.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package processor

import (
"context"
"strings"

authnv1 "k8s.io/api/authentication/v1"
Expand Down Expand Up @@ -38,6 +39,14 @@ const (
// - user: Human users authenticated via OIDC or other providers
// - system: Kubernetes controllers, service accounts, and other system components
func ResolveActor(user authnv1.UserInfo) v1alpha1.ActivityActor {
return ResolveActorWithResolver(context.Background(), user, nil)
}

// ResolveActorWithResolver behaves like ResolveActor but additionally
// populates ActivityActor.DisplayName for human users when resolver is non-nil
// and a matching User record is found. Resolver errors are silently ignored:
// the activity is still emitted with whatever data is available.
func ResolveActorWithResolver(ctx context.Context, user authnv1.UserInfo, resolver UserResolver) v1alpha1.ActivityActor {
actor := v1alpha1.ActivityActor{
UID: user.UID,
}
Expand All @@ -62,6 +71,15 @@ func ResolveActor(user authnv1.UserInfo) v1alpha1.ActivityActor {
actor.Name = "unknown"
}

// Enrich with display name for human users when a resolver is available.
if resolver != nil && actor.Type == ActorTypeUser && actor.Email != "" {
if info, ok, err := resolver.LookupByEmail(ctx, actor.Email); err == nil && ok {
if dn := info.DisplayName(); dn != "" {
actor.DisplayName = dn
}
}
}

return actor
}

Expand Down
Loading
Loading