Skip to content

Commit

Permalink
Enhance config sync
Browse files Browse the repository at this point in the history
  • Loading branch information
yhabteab committed Jul 7, 2023
1 parent 406657f commit 4324dc7
Show file tree
Hide file tree
Showing 19 changed files with 735 additions and 336 deletions.
2 changes: 1 addition & 1 deletion pkg/contracts/contracts.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ type Resource interface {

type Meta struct {
Id types.Binary `db:"id"`
PropertiesChecksum types.Binary `json:"-"`
PropertiesChecksum types.Binary `hash:"-"`
}

func (m *Meta) Checksum() types.Binary {
Expand Down
102 changes: 90 additions & 12 deletions pkg/database/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -406,7 +406,7 @@ func (db *Database) DeleteStreamed(
defer runtime.HandleCrash()
defer close(ch)

return db.DeleteStreamed(ctx, relation, ch, features...)
return db.DeleteStreamed(ctx, relation, ch, WithCascading(), WithBlocking())
})
streams[TableName(relation)] = ch
}
Expand Down Expand Up @@ -494,8 +494,33 @@ func (db *Database) DeleteStreamed(

// UpsertStreamed bulk upserts the specified entities via NamedBulkExec.
// The upsert statement is created using BuildUpsertStmt with the first entity from the entities stream.
// Bulk size is controlled via Options.MaxPlaceholdersPerStatement and
// concurrency is controlled via Options.MaxConnectionsPerTable.
// Bulk size is controlled via Options.MaxPlaceholdersPerStatement and concurrency is controlled via
// Options.MaxConnectionsPerTable.
//
// This sync process consists of the following steps:
//
// - It initially copies the first item from the specified stream and checks if this entity type provides relations.
// If so, it first traverses all these relations recursively and starts a separate goroutine and caches the streams
// for the started goroutine/relations type.
//
// - After the relations have been resolved, another goroutine is started which consumes from the specified `entities`
// chan and performs the following actions for each of the streamed entities:
//
// - If the consumed entity doesn't satisfy the contracts.Entity interface, it will just forward that entity to the
// next stage.
//
// - When the entity does satisfy the contracts.Entity, it applies the filter func on this entity (which hopefully
// should check for its checksums), and forwards the entity to the `forward` chan only if the filter function
// returns true and initiates a database upsert stream. Regardless, whether the function returns true, it will
// stream each of the child entity with the `relation.Stream()` method to the respective cached stream of the relation.
//
// However, when the first item doesn't satisfy the database.HasRelations interface, it will just use only two
// stages for the streamed entities to be upserted:
//
// - The first stage just consumes from the source stream (the `entities` chan) and applies the filter function (if any)
// on each of the entities. This won't forward entities for which the filter function didn't also return true as well.
//
// - The second stage just performs a database upsert queries for entities that were forwarded from the previous one.
func (db *Database) UpsertStreamed(
ctx context.Context, entities <-chan interface{}, features ...Feature,
) error {
Expand All @@ -520,7 +545,7 @@ func (db *Database) UpsertStreamed(
defer runtime.HandleCrash()
defer close(ch)

return db.UpsertStreamed(ctx, ch)
return db.UpsertStreamed(ctx, ch, WithCascading(), WithPreExecution(with.preExecution))
})
streams[TableName(relation)] = ch
}
Expand All @@ -535,19 +560,30 @@ func (db *Database) UpsertStreamed(

for {
select {
case entity, more := <-source:
case e, more := <-source:
if !more {
return nil
}

select {
case forward <- entity:
case <-ctx.Done():
return ctx.Err()
entity, ok := e.(contracts.Entity)
shouldUpsert := true
if ok && with.preExecution != nil {
shouldUpsert, err = with.preExecution(entity)
if err != nil {
return err
}
}

if shouldUpsert {
select {
case forward <- e:
case <-ctx.Done():
return ctx.Err()
}
}

select {
case dup <- entity:
case dup <- e:
case <-ctx.Done():
return ctx.Err()
}
Expand Down Expand Up @@ -591,8 +627,50 @@ func (db *Database) UpsertStreamed(
return g.Wait()
}

return db.NamedBulkExec(
ctx, stmt, db.BatchSizeByPlaceholders(placeholders), sem, forward, com.NeverSplit[any], features...)
upsertEntities := make(chan interface{})
g, ctx := errgroup.WithContext(ctx)
g.Go(func() error {
defer runtime.HandleCrash()
defer close(upsertEntities)

for {
select {
case <-ctx.Done():
return ctx.Err()
case e, ok := <-forward:
if !ok {
return nil
}

entity, ok := e.(contracts.Entity)
shouldUpsert := true
if ok && with.preExecution != nil {
shouldUpsert, err = with.preExecution(entity)
if err != nil {
return err
}
}

if shouldUpsert {
select {
case upsertEntities <- entity:
case <-ctx.Done():
return ctx.Err()
}
}
}
}
})

g.Go(func() error {
defer runtime.HandleCrash()

return db.NamedBulkExec(
ctx, stmt, db.BatchSizeByPlaceholders(placeholders), sem, upsertEntities, com.NeverSplit[any], features...,
)
})

return g.Wait()
}

// YieldAll executes the query with the supplied scope,
Expand Down
16 changes: 13 additions & 3 deletions pkg/database/features.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,18 @@ package database

import (
"github.com/icinga/icinga-kubernetes/pkg/com"
"github.com/icinga/icinga-kubernetes/pkg/contracts"
)

type Feature func(*Features)

type PreExecFunc func(contracts.Entity) (bool, error)

type Features struct {
blocking bool
cascading bool
onSuccess com.ProcessBulk[any]
blocking bool
cascading bool
onSuccess com.ProcessBulk[any]
preExecution PreExecFunc
}

func NewFeatures(features ...Feature) *Features {
Expand All @@ -33,6 +37,12 @@ func WithCascading() Feature {
}
}

func WithPreExecution(preExec PreExecFunc) Feature {
return func(f *Features) {
f.preExecution = preExec
}
}

func WithOnSuccess(fn com.ProcessBulk[any]) Feature {
return func(f *Features) {
f.onSuccess = fn
Expand Down
5 changes: 3 additions & 2 deletions pkg/schema/v1/container.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,9 @@ type Container struct {
Ready types.Bool
Started types.Bool
RestartCount int32
Devices []*ContainerDevice `db:"-"`
Mounts []*ContainerMount `db:"-"`
Logs string
Devices []*ContainerDevice `db:"-" hash:"-"`
Mounts []*ContainerMount `db:"-" hash:"-"`
}

func (c *Container) Relations() []database.Relation {
Expand Down
10 changes: 5 additions & 5 deletions pkg/schema/v1/daemon_set.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ type DaemonSet struct {
UpdateNumberScheduled int32
NumberAvailable int32
NumberUnavailable int32
Conditions []*DaemonSetCondition `json:"-" db:"-"`
Labels []*Label `json:"-" db:"-"`
Conditions []*DaemonSetCondition `db:"-" hash:"-"`
Labels []*Label `db:"-" hash:"-"`
}

type DaemonSetMeta struct {
Expand Down Expand Up @@ -66,7 +66,7 @@ func (d *DaemonSet) Obtain(k8s kmetav1.Object) {
d.NumberAvailable = daemonSet.Status.NumberAvailable
d.NumberUnavailable = daemonSet.Status.NumberUnavailable

d.PropertiesChecksum = types.Checksum(MustMarshalJSON(d))
d.PropertiesChecksum = types.HashStruct(d)

for _, condition := range daemonSet.Status.Conditions {
daemonCond := &DaemonSetCondition{
Expand All @@ -80,15 +80,15 @@ func (d *DaemonSet) Obtain(k8s kmetav1.Object) {
Reason: condition.Reason,
Message: condition.Message,
}
daemonCond.PropertiesChecksum = types.Checksum(MustMarshalJSON(daemonCond))
daemonCond.PropertiesChecksum = types.HashStruct(daemonCond)

d.Conditions = append(d.Conditions, daemonCond)
}

for labelName, labelValue := range daemonSet.Labels {
label := NewLabel(labelName, labelValue)
label.DaemonSetId = d.Id
label.PropertiesChecksum = types.Checksum(MustMarshalJSON(label))
label.PropertiesChecksum = types.HashStruct(label)

d.Labels = append(d.Labels, label)
}
Expand Down
10 changes: 5 additions & 5 deletions pkg/schema/v1/deployment.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ type Deployment struct {
ReadyReplicas int32
AvailableReplicas int32
UnavailableReplicas int32
Conditions []*DeploymentCondition `json:"-" db:"-"`
Labels []*Label `json:"-" db:"-"`
Conditions []*DeploymentCondition `db:"-" hash:"-"`
Labels []*Label `db:"-" hash:"-"`
}

type DeploymentConditionMeta struct {
Expand Down Expand Up @@ -79,7 +79,7 @@ func (d *Deployment) Obtain(k8s kmetav1.Object) {
d.ReadyReplicas = deployment.Status.ReadyReplicas
d.UnavailableReplicas = deployment.Status.UnavailableReplicas

d.PropertiesChecksum = types.Checksum(MustMarshalJSON(d))
d.PropertiesChecksum = types.HashStruct(d)

for _, condition := range deployment.Status.Conditions {
deploymentCond := &DeploymentCondition{
Expand All @@ -94,15 +94,15 @@ func (d *Deployment) Obtain(k8s kmetav1.Object) {
Reason: condition.Reason,
Message: condition.Message,
}
deploymentCond.PropertiesChecksum = types.Checksum(MustMarshalJSON(deploymentCond))
deploymentCond.PropertiesChecksum = types.HashStruct(deploymentCond)

d.Conditions = append(d.Conditions, deploymentCond)
}

for labelName, labelValue := range deployment.Labels {
label := NewLabel(labelName, labelValue)
label.DeploymentId = d.Id
label.PropertiesChecksum = types.Checksum(MustMarshalJSON(label))
label.PropertiesChecksum = types.HashStruct(label)

d.Labels = append(d.Labels, label)
}
Expand Down
4 changes: 1 addition & 3 deletions pkg/schema/v1/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,6 @@ func NewEvent() contracts.Entity {

func (e *Event) Obtain(k8s kmetav1.Object) {
e.ObtainMeta(k8s)
defer func() {
e.PropertiesChecksum = types.Checksum(MustMarshalJSON(e))
}()

event := k8s.(*keventsv1.Event)

Expand All @@ -56,6 +53,7 @@ func (e *Event) Obtain(k8s kmetav1.Object) {
e.LastSeen = types.UnixMilli(event.DeprecatedLastTimestamp.Time)
}
e.Count = event.DeprecatedCount
e.PropertiesChecksum = types.HashStruct(e)
// e.FirstSeen = types.UnixMilli(event.EventTime.Time)
// if event.Series != nil {
// e.LastSeen = types.UnixMilli(event.Series.LastObservedTime.Time)
Expand Down
6 changes: 3 additions & 3 deletions pkg/schema/v1/namespace.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (
type Namespace struct {
ResourceMeta
Phase string
Conditions []*NamespaceCondition `json:"-" db:"-"`
Conditions []*NamespaceCondition `db:"-" hash:"-"`
}

type NamespaceMeta struct {
Expand Down Expand Up @@ -49,7 +49,7 @@ func (n *Namespace) Obtain(k8s kmetav1.Object) {
n.Id = types.Checksum(namespace.Name)
n.Phase = strings.ToLower(string(namespace.Status.Phase))

n.PropertiesChecksum = types.Checksum(MustMarshalJSON(n))
n.PropertiesChecksum = types.HashStruct(n)

for _, condition := range namespace.Status.Conditions {
namespaceCond := &NamespaceCondition{
Expand All @@ -63,7 +63,7 @@ func (n *Namespace) Obtain(k8s kmetav1.Object) {
Reason: condition.Reason,
Message: condition.Message,
}
namespaceCond.PropertiesChecksum = types.Checksum(MustMarshalJSON(namespaceCond))
namespaceCond.PropertiesChecksum = types.HashStruct(namespaceCond)

n.Conditions = append(n.Conditions, namespaceCond)
}
Expand Down
10 changes: 5 additions & 5 deletions pkg/schema/v1/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ type Node struct {
MemoryCapacity int64
MemoryAllocatable int64
PodCapacity int64
Conditions []*NodeCondition `json:"-" db:"-"`
Volumes []*NodeVolume `json:"-" db:"-"`
Conditions []*NodeCondition `db:"-" hash:"-"`
Volumes []*NodeVolume `db:"-" hash:"-"`
}

type NodeMeta struct {
Expand Down Expand Up @@ -87,7 +87,7 @@ func (n *Node) Obtain(k8s kmetav1.Object) {
n.MemoryAllocatable = node.Status.Allocatable.Memory().MilliValue()
n.PodCapacity = node.Status.Allocatable.Pods().Value()

n.PropertiesChecksum = types.Checksum(MustMarshalJSON(n))
n.PropertiesChecksum = types.HashStruct(n)

for _, condition := range node.Status.Conditions {
nodeCond := &NodeCondition{
Expand All @@ -102,7 +102,7 @@ func (n *Node) Obtain(k8s kmetav1.Object) {
Reason: condition.Reason,
Message: condition.Message,
}
nodeCond.PropertiesChecksum = types.Checksum(MustMarshalJSON(nodeCond))
nodeCond.PropertiesChecksum = types.HashStruct(nodeCond)

n.Conditions = append(n.Conditions, nodeCond)
}
Expand All @@ -125,7 +125,7 @@ func (n *Node) Obtain(k8s kmetav1.Object) {
Valid: true,
},
}
nodeVolume.PropertiesChecksum = types.Checksum(MustMarshalJSON(nodeVolume))
nodeVolume.PropertiesChecksum = types.HashStruct(nodeVolume)

n.Volumes = append(n.Volumes, nodeVolume)
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/schema/v1/persistent_volume.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ type PersistentVolume struct {
Phase string
Reason string
Message string
Claim *PersistentVolumeClaimRef `json:"-" db:"-"`
Claim *PersistentVolumeClaimRef `db:"-" hash:"-"`
}

type PersistentVolumeMeta struct {
Expand Down Expand Up @@ -75,7 +75,7 @@ func (p *PersistentVolume) Obtain(k8s kmetav1.Object) {
panic(err)
}

p.PropertiesChecksum = types.Checksum(MustMarshalJSON(p))
p.PropertiesChecksum = types.HashStruct(p)

p.Claim = &PersistentVolumeClaimRef{
PersistentVolumeMeta: PersistentVolumeMeta{
Expand All @@ -86,7 +86,7 @@ func (p *PersistentVolume) Obtain(k8s kmetav1.Object) {
Name: persistentVolume.Spec.ClaimRef.Name,
Uid: persistentVolume.Spec.ClaimRef.UID,
}
p.Claim.PropertiesChecksum = types.Checksum(MustMarshalJSON(p.Claim))
p.Claim.PropertiesChecksum = types.HashStruct(p.Claim)
}

func (p *PersistentVolume) Relations() []database.Relation {
Expand Down

0 comments on commit 4324dc7

Please sign in to comment.