diff --git a/pkg/contracts/contracts.go b/pkg/contracts/contracts.go index 1111005..6f73499 100644 --- a/pkg/contracts/contracts.go +++ b/pkg/contracts/contracts.go @@ -51,7 +51,7 @@ type Resource interface { type Meta struct { Id types.Binary `db:"id"` - PropertiesChecksum types.Binary `json:"-"` + PropertiesChecksum types.Binary `hash:"-"` } func (m *Meta) Checksum() types.Binary { diff --git a/pkg/database/database.go b/pkg/database/database.go index 9b3ff94..868f830 100644 --- a/pkg/database/database.go +++ b/pkg/database/database.go @@ -406,7 +406,7 @@ func (db *Database) DeleteStreamed( defer runtime.HandleCrash() defer close(ch) - return db.DeleteStreamed(ctx, relation, ch, features...) + return db.DeleteStreamed(ctx, relation, ch, WithCascading(), WithBlocking()) }) streams[TableName(relation)] = ch } @@ -494,8 +494,33 @@ func (db *Database) DeleteStreamed( // UpsertStreamed bulk upserts the specified entities via NamedBulkExec. // The upsert statement is created using BuildUpsertStmt with the first entity from the entities stream. -// Bulk size is controlled via Options.MaxPlaceholdersPerStatement and -// concurrency is controlled via Options.MaxConnectionsPerTable. +// Bulk size is controlled via Options.MaxPlaceholdersPerStatement and concurrency is controlled via +// Options.MaxConnectionsPerTable. +// +// This sync process consists of the following steps: +// +// - It initially copies the first item from the specified stream and checks if this entity type provides relations. +// If so, it first traverses all these relations recursively and starts a separate goroutine and caches the streams +// for the started goroutine/relations type. +// +// - After the relations have been resolved, another goroutine is started which consumes from the specified `entities` +// chan and performs the following actions for each of the streamed entities: +// +// - If the consumed entity doesn't satisfy the contracts.Entity interface, it will just forward that entity to the +// next stage. +// +// - When the entity does satisfy the contracts.Entity, it applies the filter func on this entity (which hopefully +// should check for its checksums), and forwards the entity to the `forward` chan only if the filter function +// returns true and initiates a database upsert stream. Regardless, whether the function returns true, it will +// stream each of the child entity with the `relation.Stream()` method to the respective cached stream of the relation. +// +// However, when the first item doesn't satisfy the database.HasRelations interface, it will just use only two +// stages for the streamed entities to be upserted: +// +// - The first stage just consumes from the source stream (the `entities` chan) and applies the filter function (if any) +// on each of the entities. This won't forward entities for which the filter function didn't also return true as well. +// +// - The second stage just performs a database upsert queries for entities that were forwarded from the previous one. func (db *Database) UpsertStreamed( ctx context.Context, entities <-chan interface{}, features ...Feature, ) error { @@ -520,7 +545,7 @@ func (db *Database) UpsertStreamed( defer runtime.HandleCrash() defer close(ch) - return db.UpsertStreamed(ctx, ch) + return db.UpsertStreamed(ctx, ch, WithCascading(), WithPreExecution(with.preExecution)) }) streams[TableName(relation)] = ch } @@ -535,19 +560,30 @@ func (db *Database) UpsertStreamed( for { select { - case entity, more := <-source: + case e, more := <-source: if !more { return nil } - select { - case forward <- entity: - case <-ctx.Done(): - return ctx.Err() + entity, ok := e.(contracts.Entity) + shouldUpsert := true + if ok && with.preExecution != nil { + shouldUpsert, err = with.preExecution(entity) + if err != nil { + return err + } + } + + if shouldUpsert { + select { + case forward <- e: + case <-ctx.Done(): + return ctx.Err() + } } select { - case dup <- entity: + case dup <- e: case <-ctx.Done(): return ctx.Err() } @@ -591,8 +627,50 @@ func (db *Database) UpsertStreamed( return g.Wait() } - return db.NamedBulkExec( - ctx, stmt, db.BatchSizeByPlaceholders(placeholders), sem, forward, com.NeverSplit[any], features...) + upsertEntities := make(chan interface{}) + g, ctx := errgroup.WithContext(ctx) + g.Go(func() error { + defer runtime.HandleCrash() + defer close(upsertEntities) + + for { + select { + case <-ctx.Done(): + return ctx.Err() + case e, ok := <-forward: + if !ok { + return nil + } + + entity, ok := e.(contracts.Entity) + shouldUpsert := true + if ok && with.preExecution != nil { + shouldUpsert, err = with.preExecution(entity) + if err != nil { + return err + } + } + + if shouldUpsert { + select { + case upsertEntities <- entity: + case <-ctx.Done(): + return ctx.Err() + } + } + } + } + }) + + g.Go(func() error { + defer runtime.HandleCrash() + + return db.NamedBulkExec( + ctx, stmt, db.BatchSizeByPlaceholders(placeholders), sem, upsertEntities, com.NeverSplit[any], features..., + ) + }) + + return g.Wait() } // YieldAll executes the query with the supplied scope, diff --git a/pkg/database/features.go b/pkg/database/features.go index cafb26a..54ddcf8 100644 --- a/pkg/database/features.go +++ b/pkg/database/features.go @@ -2,14 +2,18 @@ package database import ( "github.com/icinga/icinga-kubernetes/pkg/com" + "github.com/icinga/icinga-kubernetes/pkg/contracts" ) type Feature func(*Features) +type PreExecFunc func(contracts.Entity) (bool, error) + type Features struct { - blocking bool - cascading bool - onSuccess com.ProcessBulk[any] + blocking bool + cascading bool + onSuccess com.ProcessBulk[any] + preExecution PreExecFunc } func NewFeatures(features ...Feature) *Features { @@ -33,6 +37,12 @@ func WithCascading() Feature { } } +func WithPreExecution(preExec PreExecFunc) Feature { + return func(f *Features) { + f.preExecution = preExec + } +} + func WithOnSuccess(fn com.ProcessBulk[any]) Feature { return func(f *Features) { f.onSuccess = fn diff --git a/pkg/schema/v1/container.go b/pkg/schema/v1/container.go index 2e41c2f..067f24e 100644 --- a/pkg/schema/v1/container.go +++ b/pkg/schema/v1/container.go @@ -20,8 +20,9 @@ type Container struct { Ready types.Bool Started types.Bool RestartCount int32 - Devices []*ContainerDevice `db:"-"` - Mounts []*ContainerMount `db:"-"` + Logs string + Devices []*ContainerDevice `db:"-" hash:"-"` + Mounts []*ContainerMount `db:"-" hash:"-"` } func (c *Container) Relations() []database.Relation { diff --git a/pkg/schema/v1/daemon_set.go b/pkg/schema/v1/daemon_set.go index 6dff224..860835c 100644 --- a/pkg/schema/v1/daemon_set.go +++ b/pkg/schema/v1/daemon_set.go @@ -20,8 +20,8 @@ type DaemonSet struct { UpdateNumberScheduled int32 NumberAvailable int32 NumberUnavailable int32 - Conditions []*DaemonSetCondition `json:"-" db:"-"` - Labels []*Label `json:"-" db:"-"` + Conditions []*DaemonSetCondition `db:"-" hash:"-"` + Labels []*Label `db:"-" hash:"-"` } type DaemonSetMeta struct { @@ -66,7 +66,7 @@ func (d *DaemonSet) Obtain(k8s kmetav1.Object) { d.NumberAvailable = daemonSet.Status.NumberAvailable d.NumberUnavailable = daemonSet.Status.NumberUnavailable - d.PropertiesChecksum = types.Checksum(MustMarshalJSON(d)) + d.PropertiesChecksum = types.HashStruct(d) for _, condition := range daemonSet.Status.Conditions { daemonCond := &DaemonSetCondition{ @@ -80,7 +80,7 @@ func (d *DaemonSet) Obtain(k8s kmetav1.Object) { Reason: condition.Reason, Message: condition.Message, } - daemonCond.PropertiesChecksum = types.Checksum(MustMarshalJSON(daemonCond)) + daemonCond.PropertiesChecksum = types.HashStruct(daemonCond) d.Conditions = append(d.Conditions, daemonCond) } @@ -88,7 +88,7 @@ func (d *DaemonSet) Obtain(k8s kmetav1.Object) { for labelName, labelValue := range daemonSet.Labels { label := NewLabel(labelName, labelValue) label.DaemonSetId = d.Id - label.PropertiesChecksum = types.Checksum(MustMarshalJSON(label)) + label.PropertiesChecksum = types.HashStruct(label) d.Labels = append(d.Labels, label) } diff --git a/pkg/schema/v1/deployment.go b/pkg/schema/v1/deployment.go index 764d0e4..a7d7dbd 100644 --- a/pkg/schema/v1/deployment.go +++ b/pkg/schema/v1/deployment.go @@ -21,8 +21,8 @@ type Deployment struct { ReadyReplicas int32 AvailableReplicas int32 UnavailableReplicas int32 - Conditions []*DeploymentCondition `json:"-" db:"-"` - Labels []*Label `json:"-" db:"-"` + Conditions []*DeploymentCondition `db:"-" hash:"-"` + Labels []*Label `db:"-" hash:"-"` } type DeploymentConditionMeta struct { @@ -79,7 +79,7 @@ func (d *Deployment) Obtain(k8s kmetav1.Object) { d.ReadyReplicas = deployment.Status.ReadyReplicas d.UnavailableReplicas = deployment.Status.UnavailableReplicas - d.PropertiesChecksum = types.Checksum(MustMarshalJSON(d)) + d.PropertiesChecksum = types.HashStruct(d) for _, condition := range deployment.Status.Conditions { deploymentCond := &DeploymentCondition{ @@ -94,7 +94,7 @@ func (d *Deployment) Obtain(k8s kmetav1.Object) { Reason: condition.Reason, Message: condition.Message, } - deploymentCond.PropertiesChecksum = types.Checksum(MustMarshalJSON(deploymentCond)) + deploymentCond.PropertiesChecksum = types.HashStruct(deploymentCond) d.Conditions = append(d.Conditions, deploymentCond) } @@ -102,7 +102,7 @@ func (d *Deployment) Obtain(k8s kmetav1.Object) { for labelName, labelValue := range deployment.Labels { label := NewLabel(labelName, labelValue) label.DeploymentId = d.Id - label.PropertiesChecksum = types.Checksum(MustMarshalJSON(label)) + label.PropertiesChecksum = types.HashStruct(label) d.Labels = append(d.Labels, label) } diff --git a/pkg/schema/v1/event.go b/pkg/schema/v1/event.go index f21a4aa..1a962ea 100644 --- a/pkg/schema/v1/event.go +++ b/pkg/schema/v1/event.go @@ -29,9 +29,6 @@ func NewEvent() contracts.Entity { func (e *Event) Obtain(k8s kmetav1.Object) { e.ObtainMeta(k8s) - defer func() { - e.PropertiesChecksum = types.Checksum(MustMarshalJSON(e)) - }() event := k8s.(*keventsv1.Event) @@ -56,6 +53,7 @@ func (e *Event) Obtain(k8s kmetav1.Object) { e.LastSeen = types.UnixMilli(event.DeprecatedLastTimestamp.Time) } e.Count = event.DeprecatedCount + e.PropertiesChecksum = types.HashStruct(e) // e.FirstSeen = types.UnixMilli(event.EventTime.Time) // if event.Series != nil { // e.LastSeen = types.UnixMilli(event.Series.LastObservedTime.Time) diff --git a/pkg/schema/v1/namespace.go b/pkg/schema/v1/namespace.go index 77c3d94..21cc3b2 100644 --- a/pkg/schema/v1/namespace.go +++ b/pkg/schema/v1/namespace.go @@ -12,7 +12,7 @@ import ( type Namespace struct { ResourceMeta Phase string - Conditions []*NamespaceCondition `json:"-" db:"-"` + Conditions []*NamespaceCondition `db:"-" hash:"-"` } type NamespaceMeta struct { @@ -49,7 +49,7 @@ func (n *Namespace) Obtain(k8s kmetav1.Object) { n.Id = types.Checksum(namespace.Name) n.Phase = strings.ToLower(string(namespace.Status.Phase)) - n.PropertiesChecksum = types.Checksum(MustMarshalJSON(n)) + n.PropertiesChecksum = types.HashStruct(n) for _, condition := range namespace.Status.Conditions { namespaceCond := &NamespaceCondition{ @@ -63,7 +63,7 @@ func (n *Namespace) Obtain(k8s kmetav1.Object) { Reason: condition.Reason, Message: condition.Message, } - namespaceCond.PropertiesChecksum = types.Checksum(MustMarshalJSON(namespaceCond)) + namespaceCond.PropertiesChecksum = types.HashStruct(namespaceCond) n.Conditions = append(n.Conditions, namespaceCond) } diff --git a/pkg/schema/v1/node.go b/pkg/schema/v1/node.go index 648d1f5..914b51c 100644 --- a/pkg/schema/v1/node.go +++ b/pkg/schema/v1/node.go @@ -22,8 +22,8 @@ type Node struct { MemoryCapacity int64 MemoryAllocatable int64 PodCapacity int64 - Conditions []*NodeCondition `json:"-" db:"-"` - Volumes []*NodeVolume `json:"-" db:"-"` + Conditions []*NodeCondition `db:"-" hash:"-"` + Volumes []*NodeVolume `db:"-" hash:"-"` } type NodeMeta struct { @@ -87,7 +87,7 @@ func (n *Node) Obtain(k8s kmetav1.Object) { n.MemoryAllocatable = node.Status.Allocatable.Memory().MilliValue() n.PodCapacity = node.Status.Allocatable.Pods().Value() - n.PropertiesChecksum = types.Checksum(MustMarshalJSON(n)) + n.PropertiesChecksum = types.HashStruct(n) for _, condition := range node.Status.Conditions { nodeCond := &NodeCondition{ @@ -102,7 +102,7 @@ func (n *Node) Obtain(k8s kmetav1.Object) { Reason: condition.Reason, Message: condition.Message, } - nodeCond.PropertiesChecksum = types.Checksum(MustMarshalJSON(nodeCond)) + nodeCond.PropertiesChecksum = types.HashStruct(nodeCond) n.Conditions = append(n.Conditions, nodeCond) } @@ -125,7 +125,7 @@ func (n *Node) Obtain(k8s kmetav1.Object) { Valid: true, }, } - nodeVolume.PropertiesChecksum = types.Checksum(MustMarshalJSON(nodeVolume)) + nodeVolume.PropertiesChecksum = types.HashStruct(nodeVolume) n.Volumes = append(n.Volumes, nodeVolume) } diff --git a/pkg/schema/v1/persistent_volume.go b/pkg/schema/v1/persistent_volume.go index 3da1172..bfb1315 100644 --- a/pkg/schema/v1/persistent_volume.go +++ b/pkg/schema/v1/persistent_volume.go @@ -23,7 +23,7 @@ type PersistentVolume struct { Phase string Reason string Message string - Claim *PersistentVolumeClaimRef `json:"-" db:"-"` + Claim *PersistentVolumeClaimRef `db:"-" hash:"-"` } type PersistentVolumeMeta struct { @@ -75,7 +75,7 @@ func (p *PersistentVolume) Obtain(k8s kmetav1.Object) { panic(err) } - p.PropertiesChecksum = types.Checksum(MustMarshalJSON(p)) + p.PropertiesChecksum = types.HashStruct(p) p.Claim = &PersistentVolumeClaimRef{ PersistentVolumeMeta: PersistentVolumeMeta{ @@ -86,7 +86,7 @@ func (p *PersistentVolume) Obtain(k8s kmetav1.Object) { Name: persistentVolume.Spec.ClaimRef.Name, Uid: persistentVolume.Spec.ClaimRef.UID, } - p.Claim.PropertiesChecksum = types.Checksum(MustMarshalJSON(p.Claim)) + p.Claim.PropertiesChecksum = types.HashStruct(p.Claim) } func (p *PersistentVolume) Relations() []database.Relation { diff --git a/pkg/schema/v1/pod.go b/pkg/schema/v1/pod.go index 960afbb..4f81e59 100644 --- a/pkg/schema/v1/pod.go +++ b/pkg/schema/v1/pod.go @@ -1,11 +1,15 @@ package v1 import ( + "context" "database/sql" + "fmt" "github.com/icinga/icinga-kubernetes/pkg/contracts" "github.com/icinga/icinga-kubernetes/pkg/database" "github.com/icinga/icinga-kubernetes/pkg/strcase" "github.com/icinga/icinga-kubernetes/pkg/types" + "golang.org/x/sync/errgroup" + "io" kcorev1 "k8s.io/api/core/v1" kmetav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ktypes "k8s.io/apimachinery/pkg/types" @@ -30,12 +34,12 @@ type Pod struct { Message string Qos string RestartPolicy string - Conditions []*PodCondition `db:"-"` - Containers []*Container `db:"-"` - Owners []*PodOwner `db:"-"` - Labels []*Label `db:"-"` - Pvcs []*PodPvc `db:"-"` - Volumes []*PodVolume `db:"-"` + Conditions []*PodCondition `db:"-" hash:"-"` + Containers []*Container `db:"-" hash:"-"` + Owners []*PodOwner `db:"-" hash:"-"` + Labels []*Label `db:"-" hash:"-"` + Pvcs []*PodPvc `db:"-" hash:"-"` + Volumes []*PodVolume `db:"-" hash:"-"` factory *PodFactory } @@ -98,7 +102,7 @@ func (f *PodFactory) New() contracts.Entity { func (p *Pod) Obtain(k8s kmetav1.Object) { p.ObtainMeta(k8s) defer func() { - p.PropertiesChecksum = types.Checksum(MustMarshalJSON(p)) + p.PropertiesChecksum = types.HashStruct(p) }() pod := k8s.(*kcorev1.Pod) @@ -113,200 +117,229 @@ func (p *Pod) Obtain(k8s kmetav1.Object) { p.Qos = strcase.Snake(string(pod.Status.QOSClass)) p.RestartPolicy = strcase.Snake(string(pod.Spec.RestartPolicy)) - for _, condition := range pod.Status.Conditions { - podCond := &PodCondition{ - PodMeta: PodMeta{ - PodId: p.Id, - Meta: contracts.Meta{Id: types.Checksum(types.MustPackSlice(p.Id, condition.Type))}, - }, - Type: string(condition.Type), - Status: string(condition.Status), - LastProbe: types.UnixMilli(condition.LastProbeTime.Time), - LastTransition: types.UnixMilli(condition.LastTransitionTime.Time), - Reason: condition.Reason, - Message: condition.Message, - } - podCond.PropertiesChecksum = types.Checksum(MustMarshalJSON(podCond)) - - p.Conditions = append(p.Conditions, podCond) - } + g := &errgroup.Group{} + g.Go(func() error { + for _, condition := range pod.Status.Conditions { + podCond := &PodCondition{ + PodMeta: PodMeta{ + PodId: p.Id, + Meta: contracts.Meta{Id: types.Checksum(types.MustPackSlice(p.Id, condition.Type))}, + }, + Type: string(condition.Type), + Status: string(condition.Status), + LastProbe: types.UnixMilli(condition.LastProbeTime.Time), + LastTransition: types.UnixMilli(condition.LastTransitionTime.Time), + Reason: condition.Reason, + Message: condition.Message, + } + podCond.PropertiesChecksum = types.HashStruct(podCond) - containerStatuses := make(map[string]kcorev1.ContainerStatus, len(pod.Spec.Containers)) - for _, containerStatus := range pod.Status.ContainerStatuses { - containerStatuses[containerStatus.Name] = containerStatus - } - for _, k8sContainer := range pod.Spec.Containers { - var started bool - if containerStatuses[k8sContainer.Name].Started != nil { - started = *containerStatuses[k8sContainer.Name].Started - } - state, stateDetails, err := MarshalFirstNonNilStructFieldToJSON(containerStatuses[k8sContainer.Name].State) - if err != nil { - panic(err) + p.Conditions = append(p.Conditions, podCond) } - var containerState sql.NullString - if state != "" { - containerState.String = strcase.Snake(state) - containerState.Valid = true - } + return nil + }) - container := &Container{ - PodMeta: PodMeta{ - PodId: p.Id, - Meta: contracts.Meta{Id: types.Checksum(pod.Namespace + "/" + pod.Name + "/" + k8sContainer.Name)}, - }, - Name: k8sContainer.Name, - Image: k8sContainer.Image, - CpuLimits: k8sContainer.Resources.Limits.Cpu().MilliValue(), - CpuRequests: k8sContainer.Resources.Requests.Cpu().MilliValue(), - MemoryLimits: k8sContainer.Resources.Limits.Memory().MilliValue(), - MemoryRequests: k8sContainer.Resources.Requests.Memory().MilliValue(), - Ready: types.Bool{ - Bool: containerStatuses[k8sContainer.Name].Ready, - Valid: true, - }, - Started: types.Bool{ - Bool: started, - Valid: true, - }, - RestartCount: containerStatuses[k8sContainer.Name].RestartCount, - State: containerState, - StateDetails: stateDetails, + g.Go(func() error { + containerStatuses := make(map[string]kcorev1.ContainerStatus, len(pod.Spec.Containers)) + for _, containerStatus := range pod.Status.ContainerStatuses { + containerStatuses[containerStatus.Name] = containerStatus } - container.PropertiesChecksum = types.Checksum(MustMarshalJSON(container)) - - p.CpuLimits += k8sContainer.Resources.Limits.Cpu().MilliValue() - p.CpuRequests += k8sContainer.Resources.Requests.Cpu().MilliValue() - p.MemoryLimits += k8sContainer.Resources.Limits.Memory().MilliValue() - p.MemoryRequests += k8sContainer.Resources.Requests.Memory().MilliValue() - - for _, device := range k8sContainer.VolumeDevices { - cd := &ContainerDevice{ - ContainerMeta: ContainerMeta{ - Meta: contracts.Meta{Id: types.Checksum(types.MustPackSlice(p.Id, container.Id, device.Name))}, - ContainerId: container.Id, - }, - Name: device.Name, - Path: device.DevicePath, + for _, k8sContainer := range pod.Spec.Containers { + var started bool + if containerStatuses[k8sContainer.Name].Started != nil { + started = *containerStatuses[k8sContainer.Name].Started + } + state, stateDetails, err := MarshalFirstNonNilStructFieldToJSON(containerStatuses[k8sContainer.Name].State) + if err != nil { + panic(err) } - cd.PropertiesChecksum = types.Checksum(MustMarshalJSON(cd)) - container.Devices = append(container.Devices, cd) - } + logs, err := getContainerLogs(p.factory.clientset, pod, k8sContainer) + if err != nil { + // ContainerCreating, NotFound, ... + fmt.Println(err) + logs = "" + } - for _, mount := range k8sContainer.VolumeMounts { - cm := &ContainerMount{ - ContainerMeta: ContainerMeta{ - Meta: contracts.Meta{Id: types.Checksum(types.MustPackSlice(p.Id, container.Id, mount.Name))}, - ContainerId: container.Id, + var containerState sql.NullString + if state != "" { + containerState.String = strcase.Snake(state) + containerState.Valid = true + } + + container := &Container{ + PodMeta: PodMeta{ + PodId: p.Id, + Meta: contracts.Meta{Id: types.Checksum(pod.Namespace + "/" + pod.Name + "/" + k8sContainer.Name)}, }, - VolumeName: mount.Name, - Path: mount.MountPath, - SubPath: mount.SubPath, - ReadOnly: types.Bool{ - Bool: mount.ReadOnly, + Name: k8sContainer.Name, + Image: k8sContainer.Image, + CpuLimits: k8sContainer.Resources.Limits.Cpu().MilliValue(), + CpuRequests: k8sContainer.Resources.Requests.Cpu().MilliValue(), + MemoryLimits: k8sContainer.Resources.Limits.Memory().MilliValue(), + MemoryRequests: k8sContainer.Resources.Requests.Memory().MilliValue(), + Logs: logs, + Ready: types.Bool{ + Bool: containerStatuses[k8sContainer.Name].Ready, Valid: true, }, + Started: types.Bool{ + Bool: started, + Valid: true, + }, + RestartCount: containerStatuses[k8sContainer.Name].RestartCount, + State: containerState, + StateDetails: stateDetails, + } + container.PropertiesChecksum = types.HashStruct(container) + p.Containers = append(p.Containers, container) + + p.CpuLimits += k8sContainer.Resources.Limits.Cpu().MilliValue() + p.CpuRequests += k8sContainer.Resources.Requests.Cpu().MilliValue() + p.MemoryLimits += k8sContainer.Resources.Limits.Memory().MilliValue() + p.MemoryRequests += k8sContainer.Resources.Requests.Memory().MilliValue() + + for _, device := range k8sContainer.VolumeDevices { + cd := &ContainerDevice{ + ContainerMeta: ContainerMeta{ + Meta: contracts.Meta{Id: types.Checksum(types.MustPackSlice(p.Id, container.Id, device.Name))}, + ContainerId: container.Id, + }, + Name: device.Name, + Path: device.DevicePath, + } + cd.PropertiesChecksum = types.HashStruct(cd) + + container.Devices = append(container.Devices, cd) } - cm.PropertiesChecksum = types.Checksum(MustMarshalJSON(cm)) - container.Mounts = append(container.Mounts, cm) + for _, mount := range k8sContainer.VolumeMounts { + cm := &ContainerMount{ + ContainerMeta: ContainerMeta{ + Meta: contracts.Meta{Id: types.Checksum(types.MustPackSlice(p.Id, container.Id, mount.Name))}, + ContainerId: container.Id, + }, + VolumeName: mount.Name, + Path: mount.MountPath, + SubPath: mount.SubPath, + ReadOnly: types.Bool{ + Bool: mount.ReadOnly, + Valid: true, + }, + } + cm.PropertiesChecksum = types.HashStruct(cm) + + container.Mounts = append(container.Mounts, cm) + } } - } - for labelName, labelValue := range pod.Labels { - label := NewLabel(labelName, labelValue) - label.PodId = p.Id - label.PropertiesChecksum = types.Checksum(MustMarshalJSON(label)) + return nil + }) - p.Labels = append(p.Labels, label) - } + g.Go(func() error { + for labelName, labelValue := range pod.Labels { + label := NewLabel(labelName, labelValue) + label.PodId = p.Id + label.PropertiesChecksum = types.HashStruct(label) - for _, ownerReference := range pod.OwnerReferences { - var blockOwnerDeletion, controller bool - if ownerReference.BlockOwnerDeletion != nil { - blockOwnerDeletion = *ownerReference.BlockOwnerDeletion - } - if ownerReference.Controller != nil { - controller = *ownerReference.Controller + p.Labels = append(p.Labels, label) } - owner := &PodOwner{ - PodMeta: PodMeta{ - PodId: p.Id, - Meta: contracts.Meta{Id: types.Checksum(types.MustPackSlice(p.Id, ownerReference.UID))}, - }, - Kind: strcase.Snake(ownerReference.Kind), - Name: ownerReference.Name, - Uid: ownerReference.UID, - BlockOwnerDeletion: types.Bool{ - Bool: blockOwnerDeletion, - Valid: true, - }, - Controller: types.Bool{ - Bool: controller, - Valid: true, - }, - } - owner.PropertiesChecksum = types.Checksum(MustMarshalJSON(owner)) - - p.Owners = append(p.Owners, owner) - } - // https://kubernetes.io/docs/concepts/workloads/pods/init-containers/#resources - for _, container := range pod.Spec.InitContainers { - // Init container must complete successfully before the next one starts, - // so we don't have to sum their resources. - p.CpuLimits = types.MaxInt(p.CpuLimits, container.Resources.Limits.Cpu().MilliValue()) - p.CpuRequests = types.MaxInt(p.CpuRequests, container.Resources.Requests.Cpu().MilliValue()) - p.MemoryLimits = types.MaxInt(p.MemoryLimits, container.Resources.Limits.Memory().MilliValue()) - p.MemoryRequests = types.MaxInt(p.MemoryRequests, container.Resources.Requests.Memory().MilliValue()) - } - - for _, volume := range pod.Spec.Volumes { - if volume.PersistentVolumeClaim != nil { - pvc := &PodPvc{ + for _, ownerReference := range pod.OwnerReferences { + var blockOwnerDeletion, controller bool + if ownerReference.BlockOwnerDeletion != nil { + blockOwnerDeletion = *ownerReference.BlockOwnerDeletion + } + if ownerReference.Controller != nil { + controller = *ownerReference.Controller + } + owner := &PodOwner{ PodMeta: PodMeta{ PodId: p.Id, - Meta: contracts.Meta{Id: types.Checksum(types.MustPackSlice(p.Id, volume.Name, volume.PersistentVolumeClaim.ClaimName))}, + Meta: contracts.Meta{Id: types.Checksum(types.MustPackSlice(p.Id, ownerReference.UID))}, }, - VolumeName: volume.Name, - ClaimName: volume.PersistentVolumeClaim.ClaimName, - ReadOnly: types.Bool{ - Bool: volume.PersistentVolumeClaim.ReadOnly, + Kind: strcase.Snake(ownerReference.Kind), + Name: ownerReference.Name, + Uid: ownerReference.UID, + BlockOwnerDeletion: types.Bool{ + Bool: blockOwnerDeletion, + Valid: true, + }, + Controller: types.Bool{ + Bool: controller, Valid: true, }, } - pvc.PropertiesChecksum = types.Checksum(MustMarshalJSON(pvc)) + owner.PropertiesChecksum = types.HashStruct(owner) - p.Pvcs = append(p.Pvcs, pvc) - } else { - t, source, err := MarshalFirstNonNilStructFieldToJSON(volume.VolumeSource) - if err != nil { - panic(err) - } + p.Owners = append(p.Owners, owner) + } - vol := &PodVolume{ - PodMeta: PodMeta{ - PodId: p.Id, - Meta: contracts.Meta{Id: types.Checksum(types.MustPackSlice(p.Id, volume.Name))}, - }, - VolumeName: volume.Name, - Type: t, - Source: source, - } - vol.PropertiesChecksum = types.Checksum(MustMarshalJSON(vol)) + return nil + }) + + g.Go(func() error { + // https://kubernetes.io/docs/concepts/workloads/pods/init-containers/#resources + for _, container := range pod.Spec.InitContainers { + // Init container must complete successfully before the next one starts, + // so we don't have to sum their resources. + p.CpuLimits = types.MaxInt(p.CpuLimits, container.Resources.Limits.Cpu().MilliValue()) + p.CpuRequests = types.MaxInt(p.CpuRequests, container.Resources.Requests.Cpu().MilliValue()) + p.MemoryLimits = types.MaxInt(p.MemoryLimits, container.Resources.Limits.Memory().MilliValue()) + p.MemoryRequests = types.MaxInt(p.MemoryRequests, container.Resources.Requests.Memory().MilliValue()) + } - p.Volumes = append(p.Volumes, vol) + for _, volume := range pod.Spec.Volumes { + if volume.PersistentVolumeClaim != nil { + pvc := &PodPvc{ + PodMeta: PodMeta{ + PodId: p.Id, + Meta: contracts.Meta{Id: types.Checksum(types.MustPackSlice(p.Id, volume.Name, volume.PersistentVolumeClaim.ClaimName))}, + }, + VolumeName: volume.Name, + ClaimName: volume.PersistentVolumeClaim.ClaimName, + ReadOnly: types.Bool{ + Bool: volume.PersistentVolumeClaim.ReadOnly, + Valid: true, + }, + } + pvc.PropertiesChecksum = types.HashStruct(pvc) + + p.Pvcs = append(p.Pvcs, pvc) + } else { + t, source, err := MarshalFirstNonNilStructFieldToJSON(volume.VolumeSource) + if err != nil { + panic(err) + } + + vol := &PodVolume{ + PodMeta: PodMeta{ + PodId: p.Id, + Meta: contracts.Meta{Id: types.Checksum(types.MustPackSlice(p.Id, volume.Name))}, + }, + VolumeName: volume.Name, + Type: t, + Source: source, + } + vol.PropertiesChecksum = types.HashStruct(vol) + + p.Volumes = append(p.Volumes, vol) + } } - } + + return nil + }) + + _ = g.Wait() // We don't expect any errors here } func (p *Pod) Relations() []database.Relation { fk := database.WithForeignKey("pod_id") return []database.Relation{ - database.HasMany(p.Containers, fk, database.WithoutCascadeDelete()), + database.HasMany(p.Conditions, fk), + database.HasMany(p.Containers, fk), database.HasMany(p.Owners, fk), database.HasMany(p.Labels, fk), database.HasMany(p.Pvcs, fk), @@ -314,6 +347,21 @@ func (p *Pod) Relations() []database.Relation { } } +func getContainerLogs(clientset *kubernetes.Clientset, pod *kcorev1.Pod, container kcorev1.Container) (string, error) { + req := clientset.CoreV1().Pods(pod.Namespace).GetLogs(pod.Name, &kcorev1.PodLogOptions{Container: container.Name}) + body, err := req.Stream(context.TODO()) + if err != nil { + return "", err + } + defer body.Close() + logs, err := io.ReadAll(body) + if err != nil { + return "", err + } + + return string(logs), nil +} + var ( _ contracts.Entity = (*Pod)(nil) _ contracts.Resource = (*Pod)(nil) diff --git a/pkg/schema/v1/pvc.go b/pkg/schema/v1/pvc.go index 84b20dd..160a3cd 100644 --- a/pkg/schema/v1/pvc.go +++ b/pkg/schema/v1/pvc.go @@ -41,8 +41,8 @@ type Pvc struct { VolumeName string VolumeMode sql.NullString StorageClass sql.NullString - Conditions []*PvcCondition `json:"-" db:"-"` - Labels []*Label `json:"-" db:"-"` + Conditions []*PvcCondition `db:"-" hash:"-"` + Labels []*Label `db:"-" hash:"-"` } type PvcMeta struct { @@ -102,7 +102,7 @@ func (p *Pvc) Obtain(k8s kmetav1.Object) { } } - p.PropertiesChecksum = types.Checksum(MustMarshalJSON(p)) + p.PropertiesChecksum = types.HashStruct(p) for _, condition := range pvc.Status.Conditions { pvcCond := &PvcCondition{ @@ -117,7 +117,7 @@ func (p *Pvc) Obtain(k8s kmetav1.Object) { Reason: condition.Reason, Message: condition.Message, } - pvcCond.PropertiesChecksum = types.Checksum(MustMarshalJSON(pvcCond)) + pvcCond.PropertiesChecksum = types.HashStruct(pvcCond) p.Conditions = append(p.Conditions, pvcCond) } @@ -125,7 +125,7 @@ func (p *Pvc) Obtain(k8s kmetav1.Object) { for labelName, labelValue := range pvc.Labels { label := NewLabel(labelName, labelValue) label.PvcId = p.Id - label.PropertiesChecksum = types.Checksum(MustMarshalJSON(label)) + label.PropertiesChecksum = types.HashStruct(label) p.Labels = append(p.Labels, label) } diff --git a/pkg/schema/v1/replica_set.go b/pkg/schema/v1/replica_set.go index bae3de9..7ffd515 100644 --- a/pkg/schema/v1/replica_set.go +++ b/pkg/schema/v1/replica_set.go @@ -18,9 +18,9 @@ type ReplicaSet struct { FullyLabeledReplicas int32 ReadyReplicas int32 AvailableReplicas int32 - Conditions []*ReplicaSetCondition `json:"-" db:"-"` - Owners []*ReplicaSetOwner `json:"-" db:"-"` - Labels []*Label `json:"-" db:"-"` + Conditions []*ReplicaSetCondition `hash:"-" db:"-"` + Owners []*ReplicaSetOwner `hash:"-" db:"-"` + Labels []*Label `hash:"-" db:"-"` } type ReplicaSetMeta struct { @@ -75,7 +75,7 @@ func (r *ReplicaSet) Obtain(k8s kmetav1.Object) { r.ReadyReplicas = replicaSet.Status.ReadyReplicas r.AvailableReplicas = replicaSet.Status.AvailableReplicas - r.PropertiesChecksum = types.Checksum(MustMarshalJSON(r)) + r.PropertiesChecksum = types.HashStruct(r) for _, condition := range replicaSet.Status.Conditions { replicaSetCond := &ReplicaSetCondition{ @@ -89,7 +89,7 @@ func (r *ReplicaSet) Obtain(k8s kmetav1.Object) { Reason: condition.Reason, Message: condition.Message, } - replicaSetCond.PropertiesChecksum = types.Checksum(MustMarshalJSON(replicaSetCond)) + replicaSetCond.PropertiesChecksum = types.HashStruct(replicaSetCond) r.Conditions = append(r.Conditions, replicaSetCond) } @@ -120,7 +120,7 @@ func (r *ReplicaSet) Obtain(k8s kmetav1.Object) { Valid: true, }, } - owner.PropertiesChecksum = types.Checksum(MustMarshalJSON(owner)) + owner.PropertiesChecksum = types.HashStruct(owner) r.Owners = append(r.Owners, owner) } @@ -128,7 +128,7 @@ func (r *ReplicaSet) Obtain(k8s kmetav1.Object) { for labelName, labelValue := range replicaSet.Labels { label := NewLabel(labelName, labelValue) label.ReplicaSetId = r.Id - label.PropertiesChecksum = types.Checksum(MustMarshalJSON(label)) + label.PropertiesChecksum = types.HashStruct(label) r.Labels = append(r.Labels, label) } diff --git a/pkg/schema/v1/service.go b/pkg/schema/v1/service.go index 3e78b5c..733c786 100644 --- a/pkg/schema/v1/service.go +++ b/pkg/schema/v1/service.go @@ -25,5 +25,5 @@ func (s *Service) Obtain(k8s kmetav1.Object) { s.Id = types.Checksum(s.Namespace + "/" + s.Name) s.Type = string(service.Spec.Type) s.ClusterIP = service.Spec.ClusterIP - s.PropertiesChecksum = types.Checksum(MustMarshalJSON(s)) + s.PropertiesChecksum = types.HashStruct(s) } diff --git a/pkg/schema/v1/stateful_set.go b/pkg/schema/v1/stateful_set.go index 55719c9..78ebc12 100644 --- a/pkg/schema/v1/stateful_set.go +++ b/pkg/schema/v1/stateful_set.go @@ -24,8 +24,8 @@ type StatefulSet struct { CurrentReplicas int32 UpdatedReplicas int32 AvailableReplicas int32 - Conditions []*StatefulSetCondition `json:"-" db:"-"` - Labels []*Label `json:"-" db:"-"` + Conditions []*StatefulSetCondition `db:"-" hash:"-"` + Labels []*Label `db:"-" hash:"-"` } type StatefulSetMeta struct { @@ -89,7 +89,7 @@ func (s *StatefulSet) Obtain(k8s kmetav1.Object) { s.UpdatedReplicas = statefulSet.Status.UpdatedReplicas s.AvailableReplicas = statefulSet.Status.AvailableReplicas - s.PropertiesChecksum = types.Checksum(MustMarshalJSON(s)) + s.PropertiesChecksum = types.HashStruct(s) for _, condition := range statefulSet.Status.Conditions { cond := &StatefulSetCondition{ @@ -103,7 +103,7 @@ func (s *StatefulSet) Obtain(k8s kmetav1.Object) { Reason: condition.Reason, Message: condition.Message, } - cond.PropertiesChecksum = types.Checksum(MustMarshalJSON(cond)) + cond.PropertiesChecksum = types.HashStruct(cond) s.Conditions = append(s.Conditions, cond) } @@ -111,7 +111,7 @@ func (s *StatefulSet) Obtain(k8s kmetav1.Object) { for labelName, labelValue := range statefulSet.Labels { label := NewLabel(labelName, labelValue) label.StatefulSetId = s.Id - label.PropertiesChecksum = types.Checksum(MustMarshalJSON(label)) + label.PropertiesChecksum = types.HashStruct(label) s.Labels = append(s.Labels, label) } diff --git a/pkg/schema/v1/utils.go b/pkg/schema/v1/utils.go index b50c6a1..e9d08e5 100644 --- a/pkg/schema/v1/utils.go +++ b/pkg/schema/v1/utils.go @@ -21,16 +21,3 @@ func MarshalFirstNonNilStructFieldToJSON(i any) (string, string, error) { return "", "", nil } - -// MustMarshalJSON json encodes the given object. -// TODO: This is just used to generate the checksum of the object properties. -// - This should no longer be necessary once we have implemented a more sophisticated -// - method for hashing a structure. -func MustMarshalJSON(v interface{}) []byte { - b, err := types.MarshalJSON(v) - if err != nil { - panic(err) - } - - return b -} diff --git a/pkg/sync/v1/sync.go b/pkg/sync/v1/sync.go index 7293833..cd89cf9 100644 --- a/pkg/sync/v1/sync.go +++ b/pkg/sync/v1/sync.go @@ -2,36 +2,40 @@ package v1 import ( "context" - "fmt" "github.com/go-logr/logr" "github.com/icinga/icinga-kubernetes/pkg/com" + "github.com/icinga/icinga-kubernetes/pkg/contracts" "github.com/icinga/icinga-kubernetes/pkg/database" - schemav1 "github.com/icinga/icinga-kubernetes/pkg/schema/v1" "github.com/icinga/icinga-kubernetes/pkg/sync" "github.com/icinga/icinga-kubernetes/pkg/types" "golang.org/x/sync/errgroup" "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/client-go/tools/cache" + "reflect" ) +type FactoryFunc func() contracts.Entity + type Sync struct { db *database.Database informer cache.SharedIndexInformer log logr.Logger - factory func() schemav1.Resource + factory FactoryFunc + store cache.Store } func NewSync( db *database.Database, informer cache.SharedIndexInformer, log logr.Logger, - factory func() schemav1.Resource, + factory FactoryFunc, ) *Sync { return &Sync{ db: db, informer: informer, log: log, factory: factory, + store: cache.NewStore(ObjectMetaKeyFunc), } } @@ -42,19 +46,61 @@ func (s *Sync) Run(ctx context.Context, features ...sync.Feature) error { if !with.NoWarmup() { if err := s.warmup(ctx, controller); err != nil { + s.log.Error(err, "warmup failed") return err } + + s.log.Info("sync warmup finished") } + s.log.Info("start syncing configs") + return s.sync(ctx, controller, features...) } +// GetState returns the cached entity of the given object. +// It returns an error if it fails to internally generate a key for the specified object, +// and nil if the provided object doesn't have a cached state. +func (s *Sync) GetState(obj interface{}) (contracts.Entity, error) { + item, exist, err := s.store.Get(obj) + if err != nil { + return nil, err + } + + if !exist { + return nil, nil + } + + return item.(contracts.Entity), nil +} + +// Delete removes the given entity and all its references from the cache store. +func (s *Sync) Delete(entity contracts.Entity, cascade bool) { + if _, ok := entity.(database.HasRelations); ok && cascade { + items := s.store.List() + for _, it := range items { + item := it.(contracts.Entity) + if entity.ID().Equal(item.ParentID()) { + // Erase all references of this entity recursively from the cache store as well. + // Example: Remove v1.Pod -> v1.Container -> v1.ContainerMount etc... + s.Delete(item, cascade) + } + } + } + + // We don't know whether there is a cached item by the given hash, so ignore any errors. + _ = s.store.Delete(entity) +} + func (s *Sync) warmup(ctx context.Context, c *sync.Controller) error { g, ctx := errgroup.WithContext(ctx) + s.log.Info("starting sync warmup") + + entity := s.factory() entities, errs := s.db.YieldAll(ctx, func() (interface{}, bool, error) { return s.factory(), true, nil - }, s.db.BuildSelectStmt(s.factory(), &schemav1.Meta{})) + }, s.db.BuildSelectStmt(entity, entity.Fingerprint())) // Let errors from YieldAll() cancel the group. com.ErrgroupReceive(g, errs) @@ -68,10 +114,16 @@ func (s *Sync) warmup(ctx context.Context, c *sync.Controller) error { return nil } - if err := c.Announce(e); err != nil { - fmt.Println(err) + if err := s.store.Add(e.(contracts.Entity).Fingerprint()); err != nil { return err } + + // The controller doesn't need to know about the entities of a k8s sub resource. + if resource, ok := e.(contracts.Resource); ok { + if err := c.Announce(resource); err != nil { + return err + } + } case <-ctx.Done(): return ctx.Err() } @@ -84,7 +136,7 @@ func (s *Sync) warmup(ctx context.Context, c *sync.Controller) error { func (s *Sync) sync(ctx context.Context, c *sync.Controller, features ...sync.Feature) error { sink := sync.NewSink(func(i *sync.Item) interface{} { entity := s.factory() - entity.Obtain(i.Item) + entity.(contracts.Resource).Obtain(i.Item) return entity }, func(k interface{}) interface{} { @@ -97,47 +149,32 @@ func (s *Sync) sync(ctx context.Context, c *sync.Controller, features ...sync.Fe g.Go(func() error { defer runtime.HandleCrash() - err := c.Stream(ctx, sink) - if err != nil { - fmt.Println(err) - } - return err - }) - g.Go(func() error { - defer runtime.HandleCrash() - - err := s.db.UpsertStreamed( - ctx, sink.UpsertCh(), - database.WithCascading(), database.WithOnSuccess(with.OnUpsert())) - if err != nil { - fmt.Println(err) - } - return err + return c.Stream(ctx, sink) }) g.Go(func() error { - defer runtime.HandleCrash() + filterFunc := func(entity contracts.Entity) (bool, error) { + lastState, err := s.GetState(entity) + if err != nil { + return false, err + } - if with.NoDelete() { - for { - select { - case _, more := <-sink.DeleteCh(): - if !more { - return nil - } - case <-ctx.Done(): - return ctx.Err() - } + // Don't upsert the entities if their checksum hasn't been changed. + if lastState == nil || !entity.Checksum().Equal(lastState.Checksum()) { + _ = s.store.Add(entity.Fingerprint()) + return true, nil } - } else { - err := s.db.DeleteStreamed( - ctx, s.factory(), sink.DeleteCh(), - database.WithBlocking(), database.WithCascading(), database.WithOnSuccess(with.OnDelete())) - if err != nil { - fmt.Println(err) - } - return err + + return false, nil } + + return s.db.UpsertStreamed( + ctx, sink.UpsertCh(), + database.WithCascading(), database.WithPreExecution(filterFunc), database.WithOnSuccess(with.OnUpsert()), + ) + }) + g.Go(func() error { + return s.deleteEntities(ctx, s.factory(), sink.DeleteCh(), features...) }) g.Go(func() error { defer runtime.HandleCrash() @@ -153,13 +190,155 @@ func (s *Sync) sync(ctx context.Context, c *sync.Controller, features ...sync.Fe case <-ctx.Done(): return ctx.Err() } - } }) err := g.Wait() if err != nil { - fmt.Println(err) + s.log.Error(err, "sync error") } return err } + +// deleteEntities consumes the entities from the provided delete stream and syncs them to the database. +// It also removes the streamed K8s entity and all its references from the cache store automatically. +// To prevent the sender goroutines of this stream from being blocked, the entities are still consumed +// from the stream even if the sync.WithNoDelete feature is specified. +func (s *Sync) deleteEntities(ctx context.Context, subject contracts.Entity, delete <-chan interface{}, features ...sync.Feature) error { + with := sync.NewFeatures(features...) + + if relations, ok := subject.(database.HasRelations); ok && !with.NoDelete() { + g, ctx := errgroup.WithContext(ctx) + streams := make(map[string]chan interface{}) + for _, relation := range relations.Relations() { + relation := relation + if !relation.CascadeDelete() { + continue + } + + if _, ok := relation.TypePointer().(contracts.Entity); !ok { + // This shouldn't crush the daemon, when some of the k8s types specify a relation + // that doesn't satisfy the contracts.Entity interface. + continue + } + + relationCh := make(chan interface{}) + g.Go(func() error { + defer runtime.HandleCrash() + defer close(relationCh) + + return s.deleteEntities(ctx, relation.TypePointer().(contracts.Entity), relationCh) + }) + streams[database.TableName(relation)] = relationCh + } + + deleteIds := make(chan interface{}) + g.Go(func() error { + defer runtime.HandleCrash() + defer close(deleteIds) + + for { + select { + case <-ctx.Done(): + return ctx.Err() + case id, ok := <-delete: + if !ok { + return nil + } + + if _, ok := id.(types.Binary); !ok { + id = types.Binary(id.([]byte)) + } + + subject.SetID(id.(types.Binary)) + items := s.store.List() + // First delete all references before deleting the parent entity. + for _, item := range items { + entity := item.(contracts.Entity) + if subject.ID().Equal(entity.ParentID()) { + for _, relation := range subject.(database.HasRelations).Relations() { + relation := relation + if reflect.TypeOf(relation.TypePointer().(contracts.Entity).Fingerprint()) == reflect.TypeOf(entity) { + select { + case streams[database.TableName(relation)] <- entity.ID(): + case <-ctx.Done(): + return ctx.Err() + } + } + } + } + } + + select { + case deleteIds <- id: + case <-ctx.Done(): + return ctx.Err() + } + + s.Delete(subject, false) + } + } + }) + + g.Go(func() error { + defer runtime.HandleCrash() + + return s.db.DeleteStreamed(ctx, subject, deleteIds, database.WithBlocking(), database.WithOnSuccess(with.OnDelete())) + }) + + return g.Wait() + } + + g, ctx := errgroup.WithContext(ctx) + deleteIds := make(chan interface{}) + g.Go(func() error { + defer runtime.HandleCrash() + defer close(deleteIds) + + for { + select { + case <-ctx.Done(): + return ctx.Err() + case id, ok := <-delete: + if !ok { + return nil + } + + if !with.NoDelete() { + select { + case deleteIds <- id: + case <-ctx.Done(): + return ctx.Err() + } + + if _, ok := id.(types.Binary); !ok { + id = types.Binary(id.([]byte)) + } + + subject.SetID(id.(types.Binary)) + s.Delete(subject, false) + } + } + } + }) + + if !with.NoDelete() { + g.Go(func() error { + defer runtime.HandleCrash() + + return s.db.DeleteStreamed(ctx, subject, deleteIds, database.WithBlocking(), database.WithOnSuccess(with.OnDelete())) + }) + } + + return g.Wait() +} + +// ObjectMetaKeyFunc provides a custom implementation of object key extraction for caching. +// The given object has to satisfy the contracts.IDer interface if it's not an explicit key. +func ObjectMetaKeyFunc(obj interface{}) (string, error) { + if _, ok := obj.(cache.ExplicitKey); ok { + return cache.MetaNamespaceKeyFunc(obj) + } + + return obj.(contracts.IDer).ID().String(), nil +} diff --git a/pkg/types/objectpacker.go b/pkg/types/objectpacker.go index 3f89bed..ef9e842 100644 --- a/pkg/types/objectpacker.go +++ b/pkg/types/objectpacker.go @@ -5,11 +5,74 @@ import ( "encoding/binary" "fmt" "github.com/pkg/errors" + "golang.org/x/exp/slices" "io" "reflect" "sort" + "sync" ) +var ( + structFields = map[reflect.Type][]StructField{} + mu sync.Mutex +) + +// StructField represents a single struct field, just like reflect.StructField, but with way less member fields. +type StructField struct { + Name string // This field name is only used for sorting struct fields slice. + Index []int // This index is just used for lookup. +} + +// HashStruct generates the SHA-1 checksum of all extracted fields of the given struct. +// By default, this will hash all struct fields except an embedded struct, anonymous and unexported fields. +// Additionally, you can also exclude some struct fields by using the `hash:"-"` tag. +func HashStruct(subject interface{}) Binary { + v := reflect.ValueOf(subject) + if v.Kind() == reflect.Pointer { + v = v.Elem() + } + + fields := getFields(v) + values := make([]interface{}, len(fields)) + for _, field := range fields { + values = append(values, v.FieldByIndex(field.Index).Interface()) + } + + return Checksum(MustPackSlice(values...)) +} + +// getFields returns a slice of StructField extracted from the given subject. +// By default, this will hash all struct fields except an embedded struct, anonymous and unexported fields. +// Additionally, you can also exclude some struct fields by using the `hash:"-"` tag. +func getFields(subject reflect.Value) []StructField { + mu.Lock() + defer mu.Unlock() + + var fields []StructField + + fields = structFields[subject.Type()] + if fields == nil { + for _, field := range reflect.VisibleFields(subject.Type()) { + // We don't want an embedded struct to be part of the generated hash! + if field.Type.Kind() == reflect.Struct || !field.IsExported() || field.Anonymous { + continue + } + + if field.Tag.Get("hash") != "ignore" && field.Tag.Get("hash") != "-" { + fields = append(fields, StructField{Name: field.Name, Index: field.Index}) + } + } + + slices.SortStableFunc(fields, func(a, b StructField) bool { + return a.Name < b.Name + }) + + structFields[subject.Type()] = fields + } + + return fields +} + // MustPackSlice calls PackAny using items and panics if there was an error. func MustPackSlice(items ...interface{}) []byte { var buf bytes.Buffer @@ -28,6 +91,8 @@ func MustPackSlice(items ...interface{}) []byte { // PackAny(false) => 0x1 // PackAny(true) => 0x2 // PackAny(float64(42)) => 0x3 ieee754_binary64_bigendian(42) +// PackAny(int(42)) => 0x7 int64_binary64_bigendian(42) +// PackAny(uint(42)) => 0x8 uint64_binary64_bigendian(42) // PackAny("exämple") => 0x4 uint64_bigendian(len([]byte("exämple"))) []byte("exämple") // PackAny([]uint8{0x42}) => 0x4 uint64_bigendian(len([]uint8{0x42})) []uint8{0x42} // PackAny([1]uint8{0x42}) => 0x4 uint64_bigendian(len([1]uint8{0x42})) [1]uint8{0x42} @@ -66,6 +131,18 @@ func packValue(in reflect.Value, out io.Writer) error { } return binary.Write(out, binary.BigEndian, in.Float()) + case reflect.Int, reflect.Int8, reflect.Int16, reflect.Int32, reflect.Int64: + if _, err := out.Write([]byte{7}); err != nil { + return err + } + + return binary.Write(out, binary.BigEndian, in.Int()) + case reflect.Uint, reflect.Uint8, reflect.Uint16, reflect.Uint32, reflect.Uint64: + if _, err := out.Write([]byte{8}); err != nil { + return err + } + + return binary.Write(out, binary.BigEndian, in.Uint()) case reflect.Array, reflect.Slice: if typ := in.Type(); typ.Elem() == tByte { if kind == reflect.Array { diff --git a/schema/mysql/schema.sql b/schema/mysql/schema.sql index 0ff534a..130064b 100644 --- a/schema/mysql/schema.sql +++ b/schema/mysql/schema.sql @@ -1,5 +1,6 @@ CREATE TABLE namespace ( id binary(20) NOT NULL COMMENT 'sha1(name)', + properties_checksum binary(20) NOT NULL COMMENT 'sha1(all properties)', namespace varchar(63) COLLATE utf8mb4_unicode_ci NOT NULL, /* TODO: Remove. A namespace does not have a namespace. */ name varchar(63) COLLATE utf8mb4_unicode_ci NOT NULL, uid varchar(255) COLLATE utf8mb4_unicode_ci NOT NULL, @@ -10,17 +11,20 @@ CREATE TABLE namespace ( ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin; CREATE TABLE namespace_condition ( + id binary(20) NOT NULL COMMENT 'sha1(namespace.id + type)', namespace_id binary(20) NOT NULL, + properties_checksum binary(20) NOT NULL COMMENT 'sha1(all properties)', type varchar(255) COLLATE utf8mb4_unicode_ci NOT NULL, status varchar(255) COLLATE utf8mb4_unicode_ci NOT NULL, last_transition bigint unsigned NOT NULL, reason varchar(255) COLLATE utf8mb4_unicode_ci NOT NULL, message varchar(255) COLLATE utf8mb4_unicode_ci NOT NULL, - PRIMARY KEY (namespace_id, type) + PRIMARY KEY (id) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin; CREATE TABLE node ( id binary(20) NOT NULL, + properties_checksum binary(20) NOT NULL COMMENT 'sha1(all properties)', namespace varchar(63) COLLATE utf8mb4_unicode_ci NOT NULL, name varchar(253) COLLATE utf8mb4_unicode_ci NOT NULL, uid varchar(255) COLLATE utf8mb4_unicode_ci NOT NULL, @@ -39,26 +43,31 @@ CREATE TABLE node ( ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin; CREATE TABLE node_condition ( + id binary(20) NOT NULL COMMENT 'sha1(node.id + type)', node_id binary(20) NOT NULL, + properties_checksum binary(20) NOT NULL COMMENT 'sha1(all properties)', type varchar(255) COLLATE utf8mb4_unicode_ci NOT NULL, status varchar(255) COLLATE utf8mb4_unicode_ci NOT NULL, last_heartbeat bigint unsigned NOT NULL, last_transition bigint unsigned NOT NULL, reason varchar(255) COLLATE utf8mb4_unicode_ci NOT NULL, message varchar(255) COLLATE utf8mb4_unicode_ci NOT NULL, - PRIMARY KEY (node_id, type) + PRIMARY KEY (id) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin; CREATE TABLE node_volume ( + id binary(20) NOT NULL COMMENT 'sha1(node.id + name)', node_id binary(20) NOT NULL, + properties_checksum binary(20) NOT NULL COMMENT 'sha1(all properties)', name varchar(63) COLLATE utf8mb4_unicode_ci NOT NULL, device_path varchar(255) COLLATE utf8mb4_unicode_ci NOT NULL, mounted enum('n', 'y') COLLATE utf8mb4_unicode_ci NOT NULL, - PRIMARY KEY (node_id, name) + PRIMARY KEY (id) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin; CREATE TABLE pod ( id binary(20) NOT NULL COMMENT 'sha1(namespace/name)', + properties_checksum binary(20) NOT NULL COMMENT 'sha1(all properties)', namespace varchar(63) COLLATE utf8mb4_unicode_ci NOT NULL, name varchar(253) COLLATE utf8mb4_unicode_ci NOT NULL, uid varchar(255) COLLATE utf8mb4_unicode_ci NOT NULL, @@ -80,44 +89,53 @@ CREATE TABLE pod ( ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin; CREATE TABLE pod_condition ( + id binary(20) NOT NULL COMMENT 'sha1(pod.id + type)', pod_id binary(20) NOT NULL, + properties_checksum binary(20) NOT NULL COMMENT 'sha1(all properties)', type varchar(255) COLLATE utf8mb4_unicode_ci NOT NULL, status varchar(255) COLLATE utf8mb4_unicode_ci NOT NULL, last_probe bigint unsigned NULL DEFAULT NULL, last_transition bigint unsigned NOT NULL, reason varchar(255) COLLATE utf8mb4_unicode_ci NOT NULL, message varchar(255) COLLATE utf8mb4_unicode_ci NOT NULL, - PRIMARY KEY (pod_id, type) + PRIMARY KEY (id) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin; CREATE TABLE pod_owner ( + id binary(20) NOT NULL COMMENT 'sha1(pod.id + uid)', pod_id binary(20) NOT NULL, + properties_checksum binary(20) NOT NULL COMMENT 'sha1(all properties)', kind enum('daemon_set', 'node', 'replica_set', 'stateful_set') COLLATE utf8mb4_unicode_ci NOT NULL, name varchar(253) COLLATE utf8mb4_unicode_ci NOT NULL, uid varchar(255) COLLATE utf8mb4_unicode_ci NOT NULL, controller enum('n', 'y') COLLATE utf8mb4_unicode_ci NOT NULL, block_owner_deletion enum('n', 'y') COLLATE utf8mb4_unicode_ci NOT NULL, - PRIMARY KEY (pod_id, uid) + PRIMARY KEY (id) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin; CREATE TABLE pod_pvc ( + id binary(20) NOT NULL COMMENT 'sha1(pod.id + volume_name + claim_name)', pod_id binary(20) NOT NULL, + properties_checksum binary(20) NOT NULL COMMENT 'sha1(all properties)', volume_name varchar(63) COLLATE utf8mb4_unicode_ci NOT NULL, claim_name varchar(63) COLLATE utf8mb4_unicode_ci NOT NULL, read_only enum('n', 'y') COLLATE utf8mb4_unicode_ci NOT NULL, - PRIMARY KEY (pod_id, volume_name, claim_name) + PRIMARY KEY (id) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin; CREATE TABLE pod_volume ( + id binary(20) NOT NULL COMMENT 'sha1(pod.id + volume_name)', pod_id binary(20) NOT NULL, + properties_checksum binary(20) NOT NULL COMMENT 'sha1(all properties)', volume_name varchar(63) COLLATE utf8mb4_unicode_ci NOT NULL, type varchar(255) COLLATE utf8mb4_unicode_ci NOT NULL, source longtext NOT NULL, - PRIMARY KEY (pod_id, volume_name) + PRIMARY KEY (id) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin; CREATE TABLE container ( id binary(20) NOT NULL COMMENT 'sha1(pod.namespace/pod.name/name)', + properties_checksum binary(20) NOT NULL COMMENT 'sha1(all properties)', pod_id binary(20) NOT NULL, name varchar(63) COLLATE utf8mb4_unicode_ci NOT NULL, image varchar(255) COLLATE utf8mb4_unicode_ci NOT NULL, @@ -135,25 +153,28 @@ CREATE TABLE container ( ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin; CREATE TABLE container_device ( + id binary(20) NOT NULL COMMENT 'sha1(pod.id + container.id + name)', container_id binary(20) NOT NULL, - pod_id binary(20) NOT NULL, + properties_checksum binary(20) NOT NULL COMMENT 'sha1(all properties)', name varchar(63) COLLATE utf8mb4_unicode_ci NOT NULL, path varchar(255) COLLATE utf8mb4_unicode_ci NOT NULL, - PRIMARY KEY (container_id, name) + PRIMARY KEY (id) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin; CREATE TABLE container_mount ( + id binary(20) NOT NULL COMMENT 'sha1(pod.id + container.id + volume_name)', container_id binary(20) NOT NULL, - pod_id binary(20) NOT NULL, + properties_checksum binary(20) NOT NULL COMMENT 'sha1(all properties)', volume_name varchar(63) COLLATE utf8mb4_unicode_ci NOT NULL, path varchar(255) COLLATE utf8mb4_unicode_ci NOT NULL, sub_path varchar(255) COLLATE utf8mb4_unicode_ci NOT NULL, read_only enum('n', 'y') COLLATE utf8mb4_unicode_ci NOT NULL, - PRIMARY KEY (container_id, volume_name) + PRIMARY KEY (id) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin; CREATE TABLE deployment ( id binary(20) NOT NULL, + properties_checksum binary(20) NOT NULL COMMENT 'sha1(all properties)', namespace varchar(63) COLLATE utf8mb4_unicode_ci NOT NULL, name varchar(253) COLLATE utf8mb4_unicode_ci NOT NULL, uid varchar(255) COLLATE utf8mb4_unicode_ci NOT NULL, @@ -173,17 +194,21 @@ CREATE TABLE deployment ( ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin; CREATE TABLE deployment_condition ( + id binary(20) NOT NULL COMMENT 'sha1(deployment.id + type)', deployment_id binary(20) NOT NULL, + properties_checksum binary(20) NOT NULL COMMENT 'sha1(all properties)', type enum('available', 'progressing', 'replica_failure') COLLATE utf8mb4_unicode_ci NOT NULL, status varchar(255) COLLATE utf8mb4_unicode_ci NOT NULL, last_update bigint unsigned NOT NULL, last_transition bigint unsigned NOT NULL, reason varchar(255) COLLATE utf8mb4_unicode_ci NOT NULL, message varchar(255) COLLATE utf8mb4_unicode_ci NOT NULL, - PRIMARY KEY (deployment_id, type) + PRIMARY KEY (id) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin; CREATE TABLE service ( + id binary(20) NOT NULL COMMENT 'sha1(namespace/name)', + properties_checksum binary(20) NOT NULL COMMENT 'sha1(all properties)', namespace varchar(63) COLLATE utf8mb4_unicode_ci NOT NULL, name varchar(253) COLLATE utf8mb4_unicode_ci NOT NULL, uid varchar(255) COLLATE utf8mb4_unicode_ci NOT NULL, @@ -191,11 +216,12 @@ CREATE TABLE service ( type varchar(255) COLLATE utf8mb4_unicode_ci NOT NULL, cluster_ip varchar(255) COLLATE utf8mb4_unicode_ci NOT NULL, created bigint unsigned NOT NULL, - PRIMARY KEY (namespace, name) + PRIMARY KEY (id) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin; CREATE TABLE replica_set ( id binary(20) NOT NULL, + properties_checksum binary(20) NOT NULL COMMENT 'sha1(all properties)', namespace varchar(63) COLLATE utf8mb4_unicode_ci NOT NULL, name varchar(253) COLLATE utf8mb4_unicode_ci NOT NULL, uid varchar(255) COLLATE utf8mb4_unicode_ci NOT NULL, @@ -211,17 +237,21 @@ CREATE TABLE replica_set ( ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin; CREATE TABLE replica_set_condition ( + id binary(20) NOT NULL COMMENT 'sha1(replica_set.id + type)', replica_set_id binary(20) NOT NULL, + properties_checksum binary(20) NOT NULL COMMENT 'sha1(all properties)', type enum('replica_failure') COLLATE utf8mb4_unicode_ci NOT NULL, status varchar(255) COLLATE utf8mb4_unicode_ci NOT NULL, last_transition bigint unsigned NOT NULL, reason varchar(255) COLLATE utf8mb4_unicode_ci NOT NULL, message varchar(255) COLLATE utf8mb4_unicode_ci NOT NULL, - PRIMARY KEY (replica_set_id, type) + PRIMARY KEY (id) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin; CREATE TABLE replica_set_owner ( + id binary(20) NOT NULL COMMENT 'sha1(replica_set.id + uuid)', replica_set_id binary(20) NOT NULL, + properties_checksum binary(20) NOT NULL COMMENT 'sha1(all properties)', kind enum('deployment') COLLATE utf8mb4_unicode_ci NOT NULL, name varchar(253) COLLATE utf8mb4_unicode_ci NOT NULL, uid varchar(255) COLLATE utf8mb4_unicode_ci NOT NULL, @@ -232,6 +262,7 @@ CREATE TABLE replica_set_owner ( CREATE TABLE daemon_set ( id binary(20) NOT NULL, + properties_checksum binary(20) NOT NULL COMMENT 'sha1(all properties)', namespace varchar(63) COLLATE utf8mb4_unicode_ci NOT NULL, name varchar(253) COLLATE utf8mb4_unicode_ci NOT NULL, uid varchar(255) COLLATE utf8mb4_unicode_ci NOT NULL, @@ -250,17 +281,20 @@ CREATE TABLE daemon_set ( ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin; CREATE TABLE daemon_set_condition ( + id binary(20) NOT NULL COMMENT 'sha1(daemon_set.id + type)', daemon_set_id binary(20) NOT NULL, + properties_checksum binary(20) NOT NULL COMMENT 'sha1(all properties)', type varchar(255) COLLATE utf8mb4_unicode_ci NOT NULL, status varchar(255) COLLATE utf8mb4_unicode_ci NOT NULL, last_transition bigint unsigned NOT NULL, reason varchar(255) COLLATE utf8mb4_unicode_ci NOT NULL, message varchar(255) COLLATE utf8mb4_unicode_ci NOT NULL, - PRIMARY KEY (daemon_set_id, type) + PRIMARY KEY (id) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin; CREATE TABLE stateful_set ( id binary(20) NOT NULL, + properties_checksum binary(20) NOT NULL COMMENT 'sha1(all properties)', namespace varchar(63) COLLATE utf8mb4_unicode_ci NOT NULL, name varchar(253) COLLATE utf8mb4_unicode_ci NOT NULL, uid varchar(255) COLLATE utf8mb4_unicode_ci NOT NULL, @@ -283,60 +317,41 @@ CREATE TABLE stateful_set ( ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin; CREATE TABLE stateful_set_condition ( + id binary(20) NOT NULL COMMENT 'sha1(stateful_set.id + type)', stateful_set_id binary(20) NOT NULL, + properties_checksum binary(20) NOT NULL COMMENT 'sha1(all properties)', type varchar(255) COLLATE utf8mb4_unicode_ci NOT NULL, status varchar(255) COLLATE utf8mb4_unicode_ci NOT NULL, last_transition bigint unsigned NOT NULL, reason varchar(255) COLLATE utf8mb4_unicode_ci NOT NULL, message varchar(255) COLLATE utf8mb4_unicode_ci NOT NULL, - PRIMARY KEY (stateful_set_id, type) + PRIMARY KEY (id) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin; CREATE TABLE label ( id binary(20) NOT NULL, + properties_checksum binary(20) NOT NULL COMMENT 'sha1(all properties)', + pod_id binary(20) DEFAULT NULL, + replica_set_id binary(20) DEFAULT NULL, + deployment_id binary(20) DEFAULT NULL, + daemon_set_id binary(20) DEFAULT NULL, + stateful_set_id binary(20) DEFAULT NULL, + pvc_id binary(20) DEFAULT NULL, name varchar(63) COLLATE utf8mb4_unicode_ci NOT NULL, value varchar(255) COLLATE utf8mb4_unicode_ci NOT NULL, - PRIMARY KEY (id) -) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin; - -CREATE TABLE pod_label ( - pod_id binary(20) NOT NULL, - label_id binary(20) NOT NULL, - PRIMARY KEY (pod_id, label_id) -) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin; - -CREATE TABLE replica_set_label ( - replica_set_id binary(20) NOT NULL, - label_id binary(20) NOT NULL, - PRIMARY KEY (replica_set_id, label_id) -) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin; - -CREATE TABLE deployment_label ( - deployment_id binary(20) NOT NULL, - label_id binary(20) NOT NULL, - PRIMARY KEY (deployment_id, label_id) -) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin; -CREATE TABLE daemon_set_label ( - daemon_set_id binary(20) NOT NULL, - label_id binary(20) NOT NULL, - PRIMARY KEY (daemon_set_id, label_id) -) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin; + PRIMARY KEY (id), -CREATE TABLE stateful_set_label ( - stateful_set_id binary(20) NOT NULL, - label_id binary(20) NOT NULL, - PRIMARY KEY (stateful_set_id, label_id) -) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin; - -CREATE TABLE pvc_label ( - pvc_id binary(20) NOT NULL, - label_id binary(20) NOT NULL, - PRIMARY KEY (pvc_id, label_id) + -- The CONSTRAINT below ensures that each row is allowed to have non-NULL values in one of these constraints at a time. + CONSTRAINT nonnulls_label_consumers_check CHECK ( + (pod_id IS NOT NULL) + (replica_set_id IS NOT NULL) + (deployment_id IS NOT NULL) + (daemon_set_id IS NOT NULL) + + (stateful_set_id IS NOT NULL) + (pvc_id IS NOT NULL) = 1 + ) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin; CREATE TABLE event ( id binary(20) NOT NULL, + properties_checksum binary(20) NOT NULL COMMENT 'sha1(all properties)', namespace varchar(63) NOT NULL, name varchar(253) NOT NULL, uid varchar(255) NOT NULL, @@ -372,6 +387,7 @@ CREATE TABLE pod_metrics ( CREATE TABLE pvc ( id binary(20) NOT NULL, + properties_checksum binary(20) NOT NULL COMMENT 'sha1(all properties)', namespace varchar(63) COLLATE utf8mb4_unicode_ci NOT NULL, name varchar(63) COLLATE utf8mb4_unicode_ci NOT NULL, uid varchar(255) COLLATE utf8mb4_unicode_ci NOT NULL, @@ -389,18 +405,21 @@ CREATE TABLE pvc ( ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin; CREATE TABLE pvc_condition ( + id binary(20) NOT NULL COMMENT 'sha1(pvc.id + type)', pvc_id binary(20) NOT NULL, + properties_checksum binary(20) NOT NULL COMMENT 'sha1(all properties)', type varchar(255) COLLATE utf8mb4_unicode_ci NOT NULL, status varchar(255) COLLATE utf8mb4_unicode_ci NOT NULL, last_probe bigint unsigned NULL DEFAULT NULL, last_transition bigint unsigned NOT NULL, reason varchar(255) COLLATE utf8mb4_unicode_ci NOT NULL, message varchar(255) COLLATE utf8mb4_unicode_ci NOT NULL, - PRIMARY KEY (pvc_id, type) + PRIMARY KEY (id) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin; CREATE TABLE persistent_volume ( id binary(20) NOT NULL, + properties_checksum binary(20) NOT NULL COMMENT 'sha1(all properties)', namespace varchar(63) COLLATE utf8mb4_unicode_ci NOT NULL, name varchar(63) COLLATE utf8mb4_unicode_ci NOT NULL, uid varchar(255) COLLATE utf8mb4_unicode_ci NOT NULL, @@ -420,7 +439,9 @@ CREATE TABLE persistent_volume ( ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin; CREATE TABLE persistent_volume_claim_ref ( + id binary(20) NOT NULL COMMENT 'sha1(persistent_volume.id + uuid)', persistent_volume_id binary(20) NOT NULL, + properties_checksum binary(20) NOT NULL COMMENT 'sha1(all properties)', kind varchar(255) COLLATE utf8mb4_unicode_ci NOT NULL, name varchar(63) COLLATE utf8mb4_unicode_ci NOT NULL, uid varchar(255) COLLATE utf8mb4_unicode_ci NOT NULL,