Skip to content

Commit

Permalink
VAULT-14733: Refactor processClientRecord in activity log (#19933)
Browse files Browse the repository at this point in the history
  • Loading branch information
miagilepner committed Apr 4, 2023
1 parent 53cbcd3 commit 54904e4
Show file tree
Hide file tree
Showing 2 changed files with 131 additions and 63 deletions.
135 changes: 72 additions & 63 deletions vault/activity_log.go
Expand Up @@ -1872,6 +1872,12 @@ func (a *ActivityLog) namespaceToLabel(ctx context.Context, nsID string) string
return ns.Path
}

type (
summaryByNamespace map[string]*processByNamespace
summaryByMount map[string]*processMount
summaryByMonth map[int64]*processMonth
)

type processCounts struct {
// entityID -> present
Entities map[string]struct{}
Expand All @@ -1889,6 +1895,23 @@ func newProcessCounts() *processCounts {
}
}

func (p *processCounts) add(client *activity.EntityRecord) {
if client.NonEntity {
p.NonEntities[client.ClientID] = struct{}{}
} else {
p.Entities[client.ClientID] = struct{}{}
}
}

func (p *processCounts) contains(client *activity.EntityRecord) bool {
if client.NonEntity {
_, ok := p.NonEntities[client.ClientID]
return ok
}
_, ok := p.Entities[client.ClientID]
return ok
}

type processMount struct {
Counts *processCounts
}
Expand All @@ -1899,105 +1922,91 @@ func newProcessMount() *processMount {
}
}

func (p *processMount) add(client *activity.EntityRecord) {
p.Counts.add(client)
}

func (s summaryByMount) add(client *activity.EntityRecord) {
if _, present := s[client.MountAccessor]; !present {
s[client.MountAccessor] = newProcessMount()
}
s[client.MountAccessor].add(client)
}

type processByNamespace struct {
Counts *processCounts
Mounts map[string]*processMount
Mounts summaryByMount
}

func newByNamespace() *processByNamespace {
return &processByNamespace{
Counts: newProcessCounts(),
Mounts: make(map[string]*processMount),
Mounts: make(summaryByMount),
}
}

func (p *processByNamespace) add(client *activity.EntityRecord) {
p.Counts.add(client)
p.Mounts.add(client)
}

func (s summaryByNamespace) add(client *activity.EntityRecord) {
if _, present := s[client.NamespaceID]; !present {
s[client.NamespaceID] = newByNamespace()
}
s[client.NamespaceID].add(client)
}

type processNewClients struct {
Counts *processCounts
Namespaces map[string]*processByNamespace
Namespaces summaryByNamespace
}

func newProcessNewClients() *processNewClients {
return &processNewClients{
Counts: newProcessCounts(),
Namespaces: make(map[string]*processByNamespace),
Namespaces: make(summaryByNamespace),
}
}

func (p *processNewClients) add(client *activity.EntityRecord) {
p.Counts.add(client)
p.Namespaces.add(client)
}

type processMonth struct {
Counts *processCounts
Namespaces map[string]*processByNamespace
Namespaces summaryByNamespace
NewClients *processNewClients
}

func newProcessMonth() *processMonth {
return &processMonth{
Counts: newProcessCounts(),
Namespaces: make(map[string]*processByNamespace),
Namespaces: make(summaryByNamespace),
NewClients: newProcessNewClients(),
}
}

// processClientRecord parses the client record e and stores the breakdowns in
// the maps provided.
func processClientRecord(e *activity.EntityRecord, byNamespace map[string]*processByNamespace, byMonth map[int64]*processMonth, startTime time.Time) {
if _, present := byNamespace[e.NamespaceID]; !present {
byNamespace[e.NamespaceID] = newByNamespace()
}

if _, present := byNamespace[e.NamespaceID].Mounts[e.MountAccessor]; !present {
byNamespace[e.NamespaceID].Mounts[e.MountAccessor] = newProcessMount()
}

if e.NonEntity {
byNamespace[e.NamespaceID].Counts.NonEntities[e.ClientID] = struct{}{}
byNamespace[e.NamespaceID].Mounts[e.MountAccessor].Counts.NonEntities[e.ClientID] = struct{}{}
} else {
byNamespace[e.NamespaceID].Counts.Entities[e.ClientID] = struct{}{}
byNamespace[e.NamespaceID].Mounts[e.MountAccessor].Counts.Entities[e.ClientID] = struct{}{}
}
func (p *processMonth) add(client *activity.EntityRecord) {
p.Counts.add(client)
p.NewClients.add(client)
p.Namespaces.add(client)
}

func (s summaryByMonth) add(client *activity.EntityRecord, startTime time.Time) {
monthTimestamp := timeutil.StartOfMonth(startTime).UTC().Unix()
if _, present := byMonth[monthTimestamp]; !present {
byMonth[monthTimestamp] = newProcessMonth()
}

if _, present := byMonth[monthTimestamp].Namespaces[e.NamespaceID]; !present {
byMonth[monthTimestamp].Namespaces[e.NamespaceID] = newByNamespace()
}

if _, present := byMonth[monthTimestamp].Namespaces[e.NamespaceID].Mounts[e.MountAccessor]; !present {
byMonth[monthTimestamp].Namespaces[e.NamespaceID].Mounts[e.MountAccessor] = newProcessMount()
if _, present := s[monthTimestamp]; !present {
s[monthTimestamp] = newProcessMonth()
}
s[monthTimestamp].add(client)
}

if _, present := byMonth[monthTimestamp].NewClients.Namespaces[e.NamespaceID]; !present {
byMonth[monthTimestamp].NewClients.Namespaces[e.NamespaceID] = newByNamespace()
}

if _, present := byMonth[monthTimestamp].NewClients.Namespaces[e.NamespaceID].Mounts[e.MountAccessor]; !present {
byMonth[monthTimestamp].NewClients.Namespaces[e.NamespaceID].Mounts[e.MountAccessor] = newProcessMount()
}

// At first assume all the clients in the given month, as new.
// Before persisting this information to disk, clients that have
// activity in the previous months of a given billing cycle will be
// deleted.
if e.NonEntity == true {
byMonth[monthTimestamp].Counts.NonEntities[e.ClientID] = struct{}{}
byMonth[monthTimestamp].Namespaces[e.NamespaceID].Counts.NonEntities[e.ClientID] = struct{}{}
byMonth[monthTimestamp].Namespaces[e.NamespaceID].Mounts[e.MountAccessor].Counts.NonEntities[e.ClientID] = struct{}{}

byMonth[monthTimestamp].NewClients.Counts.NonEntities[e.ClientID] = struct{}{}
byMonth[monthTimestamp].NewClients.Namespaces[e.NamespaceID].Counts.NonEntities[e.ClientID] = struct{}{}
byMonth[monthTimestamp].NewClients.Namespaces[e.NamespaceID].Mounts[e.MountAccessor].Counts.NonEntities[e.ClientID] = struct{}{}
} else {
byMonth[monthTimestamp].Counts.Entities[e.ClientID] = struct{}{}
byMonth[monthTimestamp].Namespaces[e.NamespaceID].Counts.Entities[e.ClientID] = struct{}{}
byMonth[monthTimestamp].Namespaces[e.NamespaceID].Mounts[e.MountAccessor].Counts.Entities[e.ClientID] = struct{}{}

byMonth[monthTimestamp].NewClients.Counts.Entities[e.ClientID] = struct{}{}
byMonth[monthTimestamp].NewClients.Namespaces[e.NamespaceID].Counts.Entities[e.ClientID] = struct{}{}
byMonth[monthTimestamp].NewClients.Namespaces[e.NamespaceID].Mounts[e.MountAccessor].Counts.Entities[e.ClientID] = struct{}{}
}
// processClientRecord parses the client record e and stores the breakdowns in
// the maps provided.
func processClientRecord(e *activity.EntityRecord, byNamespace summaryByNamespace, byMonth summaryByMonth, startTime time.Time) {
byNamespace.add(e)
byMonth.add(e, startTime)
}

// goroutine to process the request in the intent log, creating precomputed queries.
Expand Down
59 changes: 59 additions & 0 deletions vault/activity_log_test.go
Expand Up @@ -4233,3 +4233,62 @@ func TestActivityLog_partialMonthClientCountWithMultipleMountPaths(t *testing.T)
}
}
}

// TestActivityLog_processClientRecord calls processClientRecord for an entity and a non-entity record and verifies that
// the record is present in the namespace and month maps
func TestActivityLog_processClientRecord(t *testing.T) {
startTime := time.Now()
mount := "mount"
namespace := "namespace"
clientID := "client-id"
run := func(t *testing.T, isNonEntity bool) {
t.Helper()
record := &activity.EntityRecord{
MountAccessor: mount,
NamespaceID: namespace,
ClientID: clientID,
NonEntity: isNonEntity,
}
byNS := make(summaryByNamespace)
byMonth := make(summaryByMonth)
processClientRecord(record, byNS, byMonth, startTime)
require.Contains(t, byNS, namespace)
require.Contains(t, byNS[namespace].Mounts, mount)
monthIndex := timeutil.StartOfMonth(startTime).UTC().Unix()
require.Contains(t, byMonth, monthIndex)
require.Equal(t, byMonth[monthIndex].Namespaces, byNS)
require.Equal(t, byMonth[monthIndex].NewClients.Namespaces, byNS)

if isNonEntity {
require.Contains(t, byMonth[monthIndex].Counts.NonEntities, clientID)
require.NotContains(t, byMonth[monthIndex].Counts.Entities, clientID)

require.Contains(t, byMonth[monthIndex].NewClients.Counts.NonEntities, clientID)
require.NotContains(t, byMonth[monthIndex].NewClients.Counts.Entities, clientID)

require.Contains(t, byNS[namespace].Mounts[mount].Counts.NonEntities, clientID)
require.Contains(t, byNS[namespace].Counts.NonEntities, clientID)

require.NotContains(t, byNS[namespace].Mounts[mount].Counts.Entities, clientID)
require.NotContains(t, byNS[namespace].Counts.Entities, clientID)
} else {
require.Contains(t, byMonth[monthIndex].Counts.Entities, clientID)
require.NotContains(t, byMonth[monthIndex].Counts.NonEntities, clientID)

require.Contains(t, byMonth[monthIndex].NewClients.Counts.Entities, clientID)
require.NotContains(t, byMonth[monthIndex].NewClients.Counts.NonEntities, clientID)

require.Contains(t, byNS[namespace].Mounts[mount].Counts.Entities, clientID)
require.Contains(t, byNS[namespace].Counts.Entities, clientID)

require.NotContains(t, byNS[namespace].Mounts[mount].Counts.NonEntities, clientID)
require.NotContains(t, byNS[namespace].Counts.NonEntities, clientID)
}
}
t.Run("non entity", func(t *testing.T) {
run(t, true)
})
t.Run("entity", func(t *testing.T) {
run(t, false)
})
}

0 comments on commit 54904e4

Please sign in to comment.