From 844dacd116a4de0bc249e1d69443ced5ecad4572 Mon Sep 17 00:00:00 2001 From: Johannes Rauh Date: Tue, 21 Nov 2023 10:59:09 +0100 Subject: [PATCH 01/12] Add forward channels to sync struct --- cmd/icinga-kubernetes/main.go | 30 +++++++++++++++++++++++++++--- pkg/sync/sync.go | 33 ++++++++++++++++++++++++++++----- 2 files changed, 55 insertions(+), 8 deletions(-) diff --git a/cmd/icinga-kubernetes/main.go b/cmd/icinga-kubernetes/main.go index 90924229..17634999 100644 --- a/cmd/icinga-kubernetes/main.go +++ b/cmd/icinga-kubernetes/main.go @@ -70,21 +70,45 @@ func main() { g, ctx := errgroup.WithContext(ctx) + forwardUpsertNodesChannel := make(chan<- any) + forwardDeleteNodesChannel := make(chan<- any) + + forwardUpsertNamespacesChannel := make(chan<- any) + forwardDeleteNamespacesChannel := make(chan<- any) + + forwardUpsertPodsChannel := make(chan<- any) + forwardDeletePodsChannel := make(chan<- any) + g.Go(func() error { return sync.NewSync( - db, schema.NewNode, informers.Core().V1().Nodes().Informer(), logs.GetChildLogger("Nodes"), + db, + schema.NewNode, + informers.Core().V1().Nodes().Informer(), + logs.GetChildLogger("Nodes"), + sync.WithForwardUpsert(forwardUpsertNodesChannel), + sync.WithForwardDelete(forwardDeleteNodesChannel), ).Run(ctx) }) g.Go(func() error { return sync.NewSync( - db, schema.NewNamespace, informers.Core().V1().Namespaces().Informer(), logs.GetChildLogger("Namespaces"), + db, + schema.NewNamespace, + informers.Core().V1().Namespaces().Informer(), + logs.GetChildLogger("Namespaces"), + sync.WithForwardUpsert(forwardUpsertNamespacesChannel), + sync.WithForwardDelete(forwardDeleteNamespacesChannel), ).Run(ctx) }) g.Go(func() error { return sync.NewSync( - db, schema.NewPod, informers.Core().V1().Pods().Informer(), logs.GetChildLogger("Pods"), + db, + schema.NewPod, + informers.Core().V1().Pods().Informer(), + logs.GetChildLogger("Pods"), + sync.WithForwardUpsert(forwardUpsertPodsChannel), + sync.WithForwardDelete(forwardDeletePodsChannel), ).Run(ctx) }) diff --git a/pkg/sync/sync.go b/pkg/sync/sync.go index d0d788db..941ec06e 100644 --- a/pkg/sync/sync.go +++ b/pkg/sync/sync.go @@ -14,15 +14,31 @@ import ( kcache "k8s.io/client-go/tools/cache" ) +type Option func(s *sync) + +func WithForwardUpsert(channel chan<- any) Option { + return func(s *sync) { + s.forwardUpsertChannel = channel + } +} + +func WithForwardDelete(channel chan<- any) Option { + return func(s *sync) { + s.forwardDeleteChannel = channel + } +} + type Sync interface { Run(context.Context) error } type sync struct { - db *database.DB - factory func() contracts.Resource - informer kcache.SharedInformer - logger *logging.Logger + db *database.DB + factory func() contracts.Resource + informer kcache.SharedInformer + logger *logging.Logger + forwardUpsertChannel chan<- any + forwardDeleteChannel chan<- any } func NewSync( @@ -30,13 +46,20 @@ func NewSync( factory func() contracts.Resource, informer kcache.SharedInformer, logger *logging.Logger, + options ...Option, ) Sync { - return &sync{ + s := &sync{ db: db, informer: informer, logger: logger, factory: factory, } + + for _, option := range options { + option(s) + } + + return s } func (s *sync) Run(ctx context.Context) error { From e612b8a39ce2f6ea6399cb6baf01de34f80db943 Mon Sep 17 00:00:00 2001 From: Johannes Rauh Date: Tue, 21 Nov 2023 11:34:27 +0100 Subject: [PATCH 02/12] Defer close forward channels --- cmd/icinga-kubernetes/main.go | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/cmd/icinga-kubernetes/main.go b/cmd/icinga-kubernetes/main.go index 17634999..d368fa39 100644 --- a/cmd/icinga-kubernetes/main.go +++ b/cmd/icinga-kubernetes/main.go @@ -71,13 +71,22 @@ func main() { g, ctx := errgroup.WithContext(ctx) forwardUpsertNodesChannel := make(chan<- any) + defer close(forwardUpsertNodesChannel) + forwardDeleteNodesChannel := make(chan<- any) + defer close(forwardDeleteNodesChannel) forwardUpsertNamespacesChannel := make(chan<- any) + defer close(forwardUpsertNamespacesChannel) + forwardDeleteNamespacesChannel := make(chan<- any) + defer close(forwardDeleteNamespacesChannel) forwardUpsertPodsChannel := make(chan<- any) + defer close(forwardUpsertPodsChannel) + forwardDeletePodsChannel := make(chan<- any) + defer close(forwardDeletePodsChannel) g.Go(func() error { return sync.NewSync( From efb75366a21f1116337701935f9b03473e365692 Mon Sep 17 00:00:00 2001 From: Johannes Rauh Date: Tue, 21 Nov 2023 12:23:01 +0100 Subject: [PATCH 03/12] Add log table to database and reformat schema --- schema/mysql/schema.sql | 77 ++++++++++++++++++++++++++--------------- 1 file changed, 49 insertions(+), 28 deletions(-) diff --git a/schema/mysql/schema.sql b/schema/mysql/schema.sql index f6eab658..93aebbb1 100644 --- a/schema/mysql/schema.sql +++ b/schema/mysql/schema.sql @@ -1,30 +1,51 @@ -CREATE TABLE node ( - id binary(20) NOT NULL, - canonical_name varchar(253) COLLATE utf8mb4_unicode_ci NOT NULL, - name varchar(253) COLLATE utf8mb4_unicode_ci NOT NULL, - uid varchar(255) COLLATE utf8mb4_unicode_ci NOT NULL, - resource_version varchar(255) NOT NULL, - created bigint unsigned NOT NULL, - PRIMARY KEY (id) -) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin; +CREATE TABLE node +( + id BINARY(20) NOT NULL, + canonical_name VARCHAR(253) COLLATE utf8mb4_unicode_ci NOT NULL, + name VARCHAR(253) COLLATE utf8mb4_unicode_ci NOT NULL, + uid VARCHAR(255) COLLATE utf8mb4_unicode_ci NOT NULL, + resource_version VARCHAR(255) NOT NULL, + created BIGINT UNSIGNED NOT NULL, + PRIMARY KEY (id) +) ENGINE = InnoDB + DEFAULT CHARSET = utf8mb4 + COLLATE = utf8mb4_bin; -CREATE TABLE namespace ( - id binary(20) NOT NULL, - canonical_name varchar(63) COLLATE utf8mb4_unicode_ci NOT NULL, - name varchar(63) COLLATE utf8mb4_unicode_ci NOT NULL, - uid varchar(255) COLLATE utf8mb4_unicode_ci NOT NULL, - resource_version varchar(255) NOT NULL, - created bigint unsigned NOT NULL, - PRIMARY KEY (id) -) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin; +CREATE TABLE namespace +( + id BINARY(20) NOT NULL, + canonical_name VARCHAR(63) COLLATE utf8mb4_unicode_ci NOT NULL, + name VARCHAR(63) COLLATE utf8mb4_unicode_ci NOT NULL, + uid VARCHAR(255) COLLATE utf8mb4_unicode_ci NOT NULL, + resource_version VARCHAR(255) NOT NULL, + created BIGINT UNSIGNED NOT NULL, + PRIMARY KEY (id) +) ENGINE = InnoDB + DEFAULT CHARSET = utf8mb4 + COLLATE = utf8mb4_bin; -CREATE TABLE pod ( - id binary(20) NOT NULL, - canonical_name varchar(317) COLLATE utf8mb4_unicode_ci NOT NULL COMMENT 'namespace/name', - namespace varchar(63) COLLATE utf8mb4_unicode_ci NOT NULL, - name varchar(253) COLLATE utf8mb4_unicode_ci NOT NULL, - uid varchar(255) COLLATE utf8mb4_unicode_ci NOT NULL, - resource_version varchar(255) NOT NULL, - created bigint unsigned NOT NULL, - PRIMARY KEY (id) -) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin; +CREATE TABLE pod +( + id BINARY(20) NOT NULL, + canonical_name VARCHAR(317) COLLATE utf8mb4_unicode_ci NOT NULL COMMENT 'namespace/name', + namespace VARCHAR(63) COLLATE utf8mb4_unicode_ci NOT NULL, + name VARCHAR(253) COLLATE utf8mb4_unicode_ci NOT NULL, + uid VARCHAR(255) COLLATE utf8mb4_unicode_ci NOT NULL, + resource_version VARCHAR(255) NOT NULL, + created BIGINT UNSIGNED NOT NULL, + PRIMARY KEY (id) +) ENGINE = InnoDB + DEFAULT CHARSET = utf8mb4 + COLLATE = utf8mb4_bin; + +CREATE TABLE log +( + id BINARY(20) NOT NULL, + reference_id BINARY(20) NOT NULL, + container_name VARCHAR(255) COLLATE utf8mb4_unicode_ci NOT NULL, + time LONGTEXT NOT NULL, + log LONGTEXT NOT NULL, + PRIMARY KEY (id) +) ENGINE = InnoDB + DEFAULT CHARSET = utf8mb4 + COLLATE = utf8mb4_bin; From 604cfa12abcbac5cf7dd628c60179794ded44e7e Mon Sep 17 00:00:00 2001 From: Johannes Rauh Date: Tue, 21 Nov 2023 16:47:44 +0100 Subject: [PATCH 04/12] Add feature for syncing container logs --- cmd/icinga-kubernetes/main.go | 48 ++++--- pkg/schema/container.go | 19 +++ pkg/schema/log.go | 10 ++ pkg/sync/logs.go | 241 ++++++++++++++++++++++++++++++++++ pkg/sync/spreader.go | 71 ++++++++++ pkg/sync/sync.go | 109 ++++++++++++--- 6 files changed, 460 insertions(+), 38 deletions(-) create mode 100644 pkg/schema/container.go create mode 100644 pkg/schema/log.go create mode 100644 pkg/sync/logs.go create mode 100644 pkg/sync/spreader.go diff --git a/cmd/icinga-kubernetes/main.go b/cmd/icinga-kubernetes/main.go index d368fa39..b8ea8156 100644 --- a/cmd/icinga-kubernetes/main.go +++ b/cmd/icinga-kubernetes/main.go @@ -7,6 +7,7 @@ import ( "github.com/icinga/icinga-go-library/driver" "github.com/icinga/icinga-go-library/logging" "github.com/icinga/icinga-kubernetes/internal" + "github.com/icinga/icinga-kubernetes/pkg/contracts" "github.com/icinga/icinga-kubernetes/pkg/schema" "github.com/icinga/icinga-kubernetes/pkg/sync" "github.com/okzk/sdnotify" @@ -70,22 +71,10 @@ func main() { g, ctx := errgroup.WithContext(ctx) - forwardUpsertNodesChannel := make(chan<- any) - defer close(forwardUpsertNodesChannel) - - forwardDeleteNodesChannel := make(chan<- any) - defer close(forwardDeleteNodesChannel) - - forwardUpsertNamespacesChannel := make(chan<- any) - defer close(forwardUpsertNamespacesChannel) - - forwardDeleteNamespacesChannel := make(chan<- any) - defer close(forwardDeleteNamespacesChannel) - - forwardUpsertPodsChannel := make(chan<- any) + forwardUpsertPodsChannel := make(chan database.Entity) defer close(forwardUpsertPodsChannel) - forwardDeletePodsChannel := make(chan<- any) + forwardDeletePodsChannel := make(chan any) defer close(forwardDeletePodsChannel) g.Go(func() error { @@ -94,8 +83,6 @@ func main() { schema.NewNode, informers.Core().V1().Nodes().Informer(), logs.GetChildLogger("Nodes"), - sync.WithForwardUpsert(forwardUpsertNodesChannel), - sync.WithForwardDelete(forwardDeleteNodesChannel), ).Run(ctx) }) @@ -105,22 +92,43 @@ func main() { schema.NewNamespace, informers.Core().V1().Namespaces().Informer(), logs.GetChildLogger("Namespaces"), - sync.WithForwardUpsert(forwardUpsertNamespacesChannel), - sync.WithForwardDelete(forwardDeleteNamespacesChannel), ).Run(ctx) }) + //upsertPodChannelSpreader := sync.NewChannelSpreader[database.Entity](forwardUpsertPodsChannel) + //deletePodChannelSpreader := sync.NewChannelSpreader[any](forwardDeletePodsChannel) + + //upsertPodChannel := upsertPodChannelSpreader.NewChannel() + //deletePodChannel := deletePodChannelSpreader.NewChannel() + + forwardUpsertPodsToLogChannel := make(chan contracts.KUpsert) + forwardDeletePodsToLogChannel := make(chan contracts.KDelete) + g.Go(func() error { + + defer close(forwardUpsertPodsToLogChannel) + defer close(forwardDeletePodsToLogChannel) + return sync.NewSync( db, schema.NewPod, informers.Core().V1().Pods().Informer(), logs.GetChildLogger("Pods"), - sync.WithForwardUpsert(forwardUpsertPodsChannel), - sync.WithForwardDelete(forwardDeletePodsChannel), + sync.WithForwardUpsertToLog(forwardUpsertPodsToLogChannel), + sync.WithForwardDeleteToLog(forwardDeletePodsToLogChannel), ).Run(ctx) }) + logSync := sync.NewLogSync(k, db, logs.GetChildLogger("ContainerLogs")) + + g.Go(func() error { + return logSync.MaintainList(ctx, forwardUpsertPodsToLogChannel, forwardDeletePodsToLogChannel) + }) + + g.Go(func() error { + return logSync.Run(ctx) + }) + if err := g.Wait(); err != nil { logging.Fatal(errors.Wrap(err, "can't sync")) } diff --git a/pkg/schema/container.go b/pkg/schema/container.go new file mode 100644 index 00000000..cceefada --- /dev/null +++ b/pkg/schema/container.go @@ -0,0 +1,19 @@ +package schema + +// +//import ( +// "github.com/icinga/icinga-kubernetes/pkg/contracts" +// kmetav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +//) +// +//type Container struct { +// kmetaWithNamespace +//} +// +//func NewContainer() contracts.Resource { +// return &Container{} +//} +// +//func (c *Container) Obtain(kobject kmetav1.Object) { +// c.kmetaWithNamespace.Obtain(kobject) +//} diff --git a/pkg/schema/log.go b/pkg/schema/log.go new file mode 100644 index 00000000..1ed69925 --- /dev/null +++ b/pkg/schema/log.go @@ -0,0 +1,10 @@ +package schema + +type Log struct { + kmetaWithoutNamespace + Id []byte + ReferenceId []byte + ContainerName string + Time string + Log string +} diff --git a/pkg/sync/logs.go b/pkg/sync/logs.go new file mode 100644 index 00000000..0b0eca45 --- /dev/null +++ b/pkg/sync/logs.go @@ -0,0 +1,241 @@ +package sync + +import ( + "bufio" + "context" + "crypto/sha1" + "fmt" + "github.com/icinga/icinga-go-library/database" + "github.com/icinga/icinga-go-library/logging" + "github.com/icinga/icinga-kubernetes/pkg/contracts" + "github.com/icinga/icinga-kubernetes/pkg/schema" + "github.com/pkg/errors" + "golang.org/x/sync/errgroup" + "io" + kcorev1 "k8s.io/api/core/v1" + kmetav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/kubernetes" + "slices" + "strings" + msync "sync" + "time" +) + +type LogSync struct { + list []*kcorev1.Pod + lastChecked map[[20]byte]*kmetav1.Time + mutex *msync.RWMutex + clientset *kubernetes.Clientset + db *database.DB + logger *logging.Logger +} + +func NewLogSync(clientset *kubernetes.Clientset, db *database.DB, logger *logging.Logger) *LogSync { + return &LogSync{ + list: []*kcorev1.Pod{}, + lastChecked: make(map[[20]byte]*kmetav1.Time), + mutex: &msync.RWMutex{}, + clientset: clientset, + db: db, + logger: logger, + } +} + +func (ls *LogSync) splitTimestampsFromMessages(log []byte, curContainerId [20]byte) (times []string, messages []string, err error) { + + stringReader := strings.NewReader(string(log)) + reader := bufio.NewReader(stringReader) + + for { + line, err := reader.ReadString('\n') + if err != nil { + if err == io.EOF { + break + } + return nil, nil, errors.Wrap(err, "error reading log message") + } + + messageTime, err := time.Parse("2006-01-02T15:04:05.999999999Z", strings.Split(line, " ")[0]) + if err != nil { + logging.Fatal(errors.Wrap(err, "error parsing log timestamp")) + continue + } + + if ls.lastChecked[curContainerId] != nil && messageTime.UnixNano() <= ls.lastChecked[curContainerId].UnixNano() { + continue + } + + times = append(times, strings.Split(line, " ")[0]) + messages = append(messages, strings.Join(strings.Split(line, " ")[1:], " ")) + } + + return times, messages, nil +} + +func (ls *LogSync) removeFromList(id database.ID) { + out := make([]*kcorev1.Pod, 0) + + for _, element := range ls.list { + + elementId := sha1.Sum([]byte(element.Namespace + "/" + element.Name)) + + if fmt.Sprintf("%x", elementId) != id.String() { + out = append(out, element) + } + } + + ls.list = out +} + +func (ls *LogSync) MaintainList(ctx context.Context, addChannel <-chan contracts.KUpsert, deleteChannel <-chan contracts.KDelete) error { + + ls.logger.Info("Starting maintain list") + + g, ctx := errgroup.WithContext(ctx) + + deletes := make(chan any) + g.Go(func() error { + defer close(deletes) + + for { + select { + case <-ctx.Done(): + return errors.Wrap(ctx.Err(), "context canceled maintain log sync list") + + case podFromChannel, more := <-addChannel: + if !more { + return nil + } + + pod := podFromChannel.KObject().(*kcorev1.Pod) + + podIsInList := false + + for _, listPod := range ls.list { + if listPod.UID == pod.UID { + podIsInList = true + } + } + + if podIsInList { + continue + } + + ls.mutex.RLock() + ls.list = append(ls.list, pod) + ls.mutex.RUnlock() + + case podIdFromChannel, more := <-deleteChannel: + if !more { + return nil + } + + idOfPod := podIdFromChannel.ID() + + ls.mutex.RLock() + ls.removeFromList(idOfPod) + ls.mutex.RUnlock() + + deletes <- idOfPod + } + + } + }) + + g.Go(func() error { + return ls.db.DeleteStreamedByField(ctx, &schema.Log{}, "reference_id", deletes) + }) + + return g.Wait() +} + +func (ls *LogSync) Run(ctx context.Context) error { + + ls.logger.Info("Starting sync") + + g, ctx := errgroup.WithContext(ctx) + + upsertStmt := ls.upsertStmt() + + upserts := make(chan database.Entity) + defer close(upserts) + + g.Go(func() error { + for { + for _, pod := range ls.list { + + curPodId := sha1.Sum([]byte(pod.Namespace + "/" + pod.Name)) + + for _, container := range pod.Spec.Containers { + + curContainerId := sha1.Sum([]byte(pod.Namespace + "/" + pod.Name + "/" + container.Name)) + + podLogOpts := kcorev1.PodLogOptions{Container: container.Name, Timestamps: true} + + if ls.lastChecked[curContainerId] != nil { + podLogOpts.SinceTime = ls.lastChecked[curContainerId] + } + + log, err := ls.clientset.CoreV1().Pods(pod.Namespace).GetLogs(pod.Name, &podLogOpts).Do(ctx).Raw() + if err != nil { + fmt.Println(errors.Wrap(err, "error reading container log")) + continue + } + + times, messages, err := ls.splitTimestampsFromMessages(log, curContainerId) + if err != nil { + return err + } + + if len(messages) == 0 { + continue + } + + newLog := &schema.Log{ + Id: curContainerId[:], + ReferenceId: curPodId[:], + ContainerName: container.Name, + Time: strings.Join(times, "\n"), + Log: strings.Join(messages, "\n"), + } + + upserts <- newLog + + lastTime, err := time.Parse("2006-01-02T15:04:05.999999999Z", times[len(times)-1]) + if err != nil { + return errors.Wrap(err, "error parsing log time") + } + + if !slices.Contains(ls.list, pod) { + continue + } + + lastV1Time := kmetav1.Time{Time: lastTime} + ls.lastChecked[curContainerId] = &lastV1Time + } + } + + select { + case <-ctx.Done(): + return ctx.Err() + case <-time.After(time.Second * 5): + } + } + }) + + g.Go(func() error { + return ls.db.UpsertStreamedWithStatement(ctx, upserts, upsertStmt, 5) + }) + + return g.Wait() +} + +func (ls *LogSync) upsertStmt() string { + return fmt.Sprintf( + "INSERT INTO %s (%s) VALUES (%s) ON DUPLICATE KEY UPDATE %s", + "log", + "id, reference_id, container_name, time, log", + ":id, :reference_id, :container_name, :time, :log", + "time=CONCAT(time, '\n', :time), log=CONCAT(log, '\n', :log)", + ) +} diff --git a/pkg/sync/spreader.go b/pkg/sync/spreader.go new file mode 100644 index 00000000..3ce9f8e2 --- /dev/null +++ b/pkg/sync/spreader.go @@ -0,0 +1,71 @@ +package sync + +import ( + "context" + "sync/atomic" +) + +type ChannelSpreader[T any] struct { + channelToBreak <-chan T + createdChannels []chan<- T + channels []chan<- T + started atomic.Bool +} + +func NewChannelSpreader[T any](channelToBreak <-chan T) *ChannelSpreader[T] { + return &ChannelSpreader[T]{ + channelToBreak: channelToBreak, + } +} + +func (cs *ChannelSpreader[T]) NewChannel() <-chan T { + if cs.started.Load() == true { + panic("ChannelSpreader already started") + } + + channel := make(chan T) + cs.createdChannels = append(cs.createdChannels, channel) + + return channel +} + +func (cs *ChannelSpreader[T]) AddChannel(channel chan<- T) { + if cs.started.Load() == true { + panic("ChannelSpreader already started") + } + + cs.channels = append(cs.channels, channel) +} + +func (cs *ChannelSpreader[T]) Run(ctx context.Context) error { + + cs.started.Store(true) + + defer func() { + for _, channelToClose := range cs.createdChannels { + close(channelToClose) + } + }() + + cs.channels = append(cs.channels, cs.createdChannels...) + + for { + select { + case spread, more := <-cs.channelToBreak: + if !more { + return nil + } + + for _, channel := range cs.channels { + select { + case channel <- spread: + case <-ctx.Done(): + return ctx.Err() + } + } + + case <-ctx.Done(): + return ctx.Err() + } + } +} diff --git a/pkg/sync/sync.go b/pkg/sync/sync.go index 941ec06e..de041a37 100644 --- a/pkg/sync/sync.go +++ b/pkg/sync/sync.go @@ -16,15 +16,15 @@ import ( type Option func(s *sync) -func WithForwardUpsert(channel chan<- any) Option { +func WithForwardUpsertToLog(channel chan<- contracts.KUpsert) Option { return func(s *sync) { - s.forwardUpsertChannel = channel + s.forwardUpsertToLogChannel = channel } } -func WithForwardDelete(channel chan<- any) Option { +func WithForwardDeleteToLog(channel chan<- contracts.KDelete) Option { return func(s *sync) { - s.forwardDeleteChannel = channel + s.forwardDeleteToLogChannel = channel } } @@ -33,12 +33,12 @@ type Sync interface { } type sync struct { - db *database.DB - factory func() contracts.Resource - informer kcache.SharedInformer - logger *logging.Logger - forwardUpsertChannel chan<- any - forwardDeleteChannel chan<- any + db *database.DB + factory func() contracts.Resource + informer kcache.SharedInformer + logger *logging.Logger + forwardUpsertToLogChannel chan<- contracts.KUpsert + forwardDeleteToLogChannel chan<- contracts.KDelete } func NewSync( @@ -88,8 +88,27 @@ func (s *sync) Run(ctx context.Context) error { s.logger.Debug("Finished warming up") - upserts := make(chan database.Entity) - defer close(upserts) + s.factory().GetResourceVersion() + + // init upsert channel spreader + multiplexUpsertChannel := make(chan contracts.KUpsert) + defer close(multiplexUpsertChannel) + + multiplexUpsert := NewChannelSpreader[contracts.KUpsert](multiplexUpsertChannel) + + upsertChannel := multiplexUpsert.NewChannel() + + if s.forwardUpsertToLogChannel != nil { + multiplexUpsert.AddChannel(s.forwardUpsertToLogChannel) + } + + // run upsert channel spreader + g.Go(func() error { + return multiplexUpsert.Run(ctx) + }) + + upsertToStream := make(chan database.Entity) + defer close(upsertToStream) for _, ch := range []<-chan contracts.KUpsert{changes.Adds(), changes.Updates()} { ch := ch @@ -102,13 +121,32 @@ func (s *sync) Run(ctx context.Context) error { return nil } + select { + case multiplexUpsertChannel <- kupsert: + case <-ctx.Done(): + return ctx.Err() + } + case <-ctx.Done(): + return ctx.Err() + } + } + }) + + g.Go(func() error { + for { + select { + case kupsert, more := <-upsertChannel: + if !more { + return nil + } + entity := s.factory() entity.SetID(kupsert.ID()) entity.SetCanonicalName(kupsert.GetCanonicalName()) entity.Obtain(kupsert.KObject()) select { - case upserts <- entity: + case upsertToStream <- entity: s.logger.Debugw( fmt.Sprintf("Sync: Upserted %s", kupsert.GetCanonicalName()), zap.String("id", kupsert.ID().String())) @@ -123,11 +161,25 @@ func (s *sync) Run(ctx context.Context) error { } g.Go(func() error { - return s.db.UpsertStreamed(ctx, upserts) + return s.db.UpsertStreamed(ctx, upsertToStream) }) - deletes := make(chan any) - defer close(deletes) + // init delete channel spreader + multiplexDeleteChannel := make(chan contracts.KDelete) + defer close(multiplexDeleteChannel) + + multiplexDelete := NewChannelSpreader[contracts.KDelete](multiplexDeleteChannel) + + deleteChannel := multiplexDelete.NewChannel() + + if s.forwardDeleteToLogChannel != nil { + multiplexDelete.AddChannel(s.forwardDeleteToLogChannel) + } + + // run delete channel spreader + g.Go(func() error { + return multiplexDelete.Run(ctx) + }) g.Go(func() error { for { @@ -136,9 +188,30 @@ func (s *sync) Run(ctx context.Context) error { if !more { return nil } + select { + case multiplexDeleteChannel <- kdelete: + case <-ctx.Done(): + return ctx.Err() + } + case <-ctx.Done(): + return ctx.Err() + } + } + }) + + deleteToStream := make(chan any) + g.Go(func() error { + defer close(deleteToStream) + + for { + select { + case kdelete, more := <-deleteChannel: + if !more { + return nil + } select { - case deletes <- kdelete.ID(): + case deleteToStream <- kdelete.ID(): s.logger.Debugw( fmt.Sprintf("Sync: Deleted %s", kdelete.GetCanonicalName()), zap.String("id", kdelete.ID().String())) @@ -152,7 +225,7 @@ func (s *sync) Run(ctx context.Context) error { }) g.Go(func() error { - return s.db.DeleteStreamed(ctx, s.factory(), deletes) + return s.db.DeleteStreamed(ctx, s.factory(), deleteToStream) }) g.Go(func() error { From 771ef0aded187247d684beae8821558e0bed47fe Mon Sep 17 00:00:00 2001 From: Johannes Rauh Date: Fri, 24 Nov 2023 14:00:15 +0100 Subject: [PATCH 05/12] Rebuild functional options --- cmd/icinga-kubernetes/main.go | 10 ++--- pkg/sync/sync.go | 70 ++++++++++++++++++++--------------- 2 files changed, 43 insertions(+), 37 deletions(-) diff --git a/cmd/icinga-kubernetes/main.go b/cmd/icinga-kubernetes/main.go index b8ea8156..830ff08e 100644 --- a/cmd/icinga-kubernetes/main.go +++ b/cmd/icinga-kubernetes/main.go @@ -95,12 +95,6 @@ func main() { ).Run(ctx) }) - //upsertPodChannelSpreader := sync.NewChannelSpreader[database.Entity](forwardUpsertPodsChannel) - //deletePodChannelSpreader := sync.NewChannelSpreader[any](forwardDeletePodsChannel) - - //upsertPodChannel := upsertPodChannelSpreader.NewChannel() - //deletePodChannel := deletePodChannelSpreader.NewChannel() - forwardUpsertPodsToLogChannel := make(chan contracts.KUpsert) forwardDeletePodsToLogChannel := make(chan contracts.KDelete) @@ -114,9 +108,11 @@ func main() { schema.NewPod, informers.Core().V1().Pods().Informer(), logs.GetChildLogger("Pods"), + ).Run( + ctx, sync.WithForwardUpsertToLog(forwardUpsertPodsToLogChannel), sync.WithForwardDeleteToLog(forwardDeletePodsToLogChannel), - ).Run(ctx) + ) }) logSync := sync.NewLogSync(k, db, logs.GetChildLogger("ContainerLogs")) diff --git a/pkg/sync/sync.go b/pkg/sync/sync.go index de041a37..acecbf30 100644 --- a/pkg/sync/sync.go +++ b/pkg/sync/sync.go @@ -14,31 +14,15 @@ import ( kcache "k8s.io/client-go/tools/cache" ) -type Option func(s *sync) - -func WithForwardUpsertToLog(channel chan<- contracts.KUpsert) Option { - return func(s *sync) { - s.forwardUpsertToLogChannel = channel - } -} - -func WithForwardDeleteToLog(channel chan<- contracts.KDelete) Option { - return func(s *sync) { - s.forwardDeleteToLogChannel = channel - } -} - type Sync interface { - Run(context.Context) error + Run(context.Context, ...SyncOption) error } type sync struct { - db *database.DB - factory func() contracts.Resource - informer kcache.SharedInformer - logger *logging.Logger - forwardUpsertToLogChannel chan<- contracts.KUpsert - forwardDeleteToLogChannel chan<- contracts.KDelete + db *database.DB + factory func() contracts.Resource + informer kcache.SharedInformer + logger *logging.Logger } func NewSync( @@ -46,7 +30,6 @@ func NewSync( factory func() contracts.Resource, informer kcache.SharedInformer, logger *logging.Logger, - options ...Option, ) Sync { s := &sync{ db: db, @@ -55,14 +38,39 @@ func NewSync( factory: factory, } - for _, option := range options { - option(s) + return s +} + +func WithForwardUpsertToLog(channel chan<- contracts.KUpsert) SyncOption { + return func(options *SyncOptions) { + options.forwardUpsertToLogChannel = channel + } +} + +func WithForwardDeleteToLog(channel chan<- contracts.KDelete) SyncOption { + return func(options *SyncOptions) { + options.forwardDeleteToLogChannel = channel } +} - return s +type SyncOption func(options *SyncOptions) + +type SyncOptions struct { + forwardUpsertToLogChannel chan<- contracts.KUpsert + forwardDeleteToLogChannel chan<- contracts.KDelete } -func (s *sync) Run(ctx context.Context) error { +func NewOptionStorage(execOptions ...SyncOption) *SyncOptions { + optionStorage := &SyncOptions{} + + for _, option := range execOptions { + option(optionStorage) + } + + return optionStorage +} + +func (s *sync) Run(ctx context.Context, execOptions ...SyncOption) error { s.logger.Info("Starting sync") s.logger.Debug("Warming up") @@ -90,6 +98,8 @@ func (s *sync) Run(ctx context.Context) error { s.factory().GetResourceVersion() + syncOptions := NewOptionStorage(execOptions...) + // init upsert channel spreader multiplexUpsertChannel := make(chan contracts.KUpsert) defer close(multiplexUpsertChannel) @@ -98,8 +108,8 @@ func (s *sync) Run(ctx context.Context) error { upsertChannel := multiplexUpsert.NewChannel() - if s.forwardUpsertToLogChannel != nil { - multiplexUpsert.AddChannel(s.forwardUpsertToLogChannel) + if syncOptions.forwardUpsertToLogChannel != nil { + multiplexUpsert.AddChannel(syncOptions.forwardUpsertToLogChannel) } // run upsert channel spreader @@ -172,8 +182,8 @@ func (s *sync) Run(ctx context.Context) error { deleteChannel := multiplexDelete.NewChannel() - if s.forwardDeleteToLogChannel != nil { - multiplexDelete.AddChannel(s.forwardDeleteToLogChannel) + if syncOptions.forwardDeleteToLogChannel != nil { + multiplexDelete.AddChannel(syncOptions.forwardDeleteToLogChannel) } // run delete channel spreader From 73334041bf0991be9351dd958cdc38a7e612841b Mon Sep 17 00:00:00 2001 From: Johannes Rauh Date: Fri, 24 Nov 2023 14:30:55 +0100 Subject: [PATCH 06/12] Use adjusted 'UpsertStreamed' and 'DeleteStreamed' from icinga-go-library --- pkg/sync/logs.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/sync/logs.go b/pkg/sync/logs.go index 0b0eca45..d95f8f65 100644 --- a/pkg/sync/logs.go +++ b/pkg/sync/logs.go @@ -143,7 +143,7 @@ func (ls *LogSync) MaintainList(ctx context.Context, addChannel <-chan contracts }) g.Go(func() error { - return ls.db.DeleteStreamedByField(ctx, &schema.Log{}, "reference_id", deletes) + return ls.db.DeleteStreamed(ctx, &schema.Log{}, deletes, database.ByColumn("reference_id")) }) return g.Wait() @@ -224,7 +224,7 @@ func (ls *LogSync) Run(ctx context.Context) error { }) g.Go(func() error { - return ls.db.UpsertStreamedWithStatement(ctx, upserts, upsertStmt, 5) + return ls.db.UpsertStreamed(ctx, upserts, database.WithStatement(upsertStmt, 5)) }) return g.Wait() From b7d3bcede6fdd0a44402c73f2528c0288f654c76 Mon Sep 17 00:00:00 2001 From: Johannes Rauh Date: Mon, 27 Nov 2023 09:20:27 +0100 Subject: [PATCH 07/12] Comment logs and channel spreader --- pkg/sync/logs.go | 32 +++++++++++++++++++++----------- pkg/sync/spreader.go | 5 +++++ 2 files changed, 26 insertions(+), 11 deletions(-) diff --git a/pkg/sync/logs.go b/pkg/sync/logs.go index d95f8f65..77d5cc4f 100644 --- a/pkg/sync/logs.go +++ b/pkg/sync/logs.go @@ -21,6 +21,8 @@ import ( "time" ) +// LogSync syncs logs to database. Therefore, it maintains a list +// of pod elements to get logs from type LogSync struct { list []*kcorev1.Pod lastChecked map[[20]byte]*kmetav1.Time @@ -30,6 +32,7 @@ type LogSync struct { logger *logging.Logger } +// NewLogSync creates new LogSync initialized with clientset, database and logger func NewLogSync(clientset *kubernetes.Clientset, db *database.DB, logger *logging.Logger) *LogSync { return &LogSync{ list: []*kcorev1.Pod{}, @@ -41,6 +44,19 @@ func NewLogSync(clientset *kubernetes.Clientset, db *database.DB, logger *loggin } } +// upsertStmt returns database upsert statement +func (ls *LogSync) upsertStmt() string { + return fmt.Sprintf( + "INSERT INTO %s (%s) VALUES (%s) ON DUPLICATE KEY UPDATE %s", + "log", + "id, reference_id, container_name, time, log", + ":id, :reference_id, :container_name, :time, :log", + "time=CONCAT(time, '\n', :time), log=CONCAT(log, '\n', :log)", + ) +} + +// splitTimestampsFromMessages takes a log as []byte and returns timestamps and messages as separate string slices. +// Additionally, it updates the last checked timestamp for the log func (ls *LogSync) splitTimestampsFromMessages(log []byte, curContainerId [20]byte) (times []string, messages []string, err error) { stringReader := strings.NewReader(string(log)) @@ -72,6 +88,7 @@ func (ls *LogSync) splitTimestampsFromMessages(log []byte, curContainerId [20]by return times, messages, nil } +// removeFromList removes pod from maintained list func (ls *LogSync) removeFromList(id database.ID) { out := make([]*kcorev1.Pod, 0) @@ -87,6 +104,7 @@ func (ls *LogSync) removeFromList(id database.ID) { ls.list = out } +// MaintainList adds pods from the addChannel to the list and deletes pods from the deleteChannel from the list func (ls *LogSync) MaintainList(ctx context.Context, addChannel <-chan contracts.KUpsert, deleteChannel <-chan contracts.KDelete) error { ls.logger.Info("Starting maintain list") @@ -149,6 +167,8 @@ func (ls *LogSync) MaintainList(ctx context.Context, addChannel <-chan contracts return g.Wait() } +// Run starts syncing the logs to the database. Therefore, it loops over all +// containers of each pod in the maintained list every 15 seconds. func (ls *LogSync) Run(ctx context.Context) error { ls.logger.Info("Starting sync") @@ -218,7 +238,7 @@ func (ls *LogSync) Run(ctx context.Context) error { select { case <-ctx.Done(): return ctx.Err() - case <-time.After(time.Second * 5): + case <-time.After(time.Second * 15): } } }) @@ -229,13 +249,3 @@ func (ls *LogSync) Run(ctx context.Context) error { return g.Wait() } - -func (ls *LogSync) upsertStmt() string { - return fmt.Sprintf( - "INSERT INTO %s (%s) VALUES (%s) ON DUPLICATE KEY UPDATE %s", - "log", - "id, reference_id, container_name, time, log", - ":id, :reference_id, :container_name, :time, :log", - "time=CONCAT(time, '\n', :time), log=CONCAT(log, '\n', :log)", - ) -} diff --git a/pkg/sync/spreader.go b/pkg/sync/spreader.go index 3ce9f8e2..2115550a 100644 --- a/pkg/sync/spreader.go +++ b/pkg/sync/spreader.go @@ -5,6 +5,7 @@ import ( "sync/atomic" ) +// ChannelSpreader takes a channel of type T and fans it out to an array of other channels of type T type ChannelSpreader[T any] struct { channelToBreak <-chan T createdChannels []chan<- T @@ -12,12 +13,14 @@ type ChannelSpreader[T any] struct { started atomic.Bool } +// NewChannelSpreader creates new ChannelSpreader initialized with the channel to break func NewChannelSpreader[T any](channelToBreak <-chan T) *ChannelSpreader[T] { return &ChannelSpreader[T]{ channelToBreak: channelToBreak, } } +// NewChannel returns and adds new output channel to the list of created channels func (cs *ChannelSpreader[T]) NewChannel() <-chan T { if cs.started.Load() == true { panic("ChannelSpreader already started") @@ -29,6 +32,7 @@ func (cs *ChannelSpreader[T]) NewChannel() <-chan T { return channel } +// AddChannel adds given output channel to the list of added channels func (cs *ChannelSpreader[T]) AddChannel(channel chan<- T) { if cs.started.Load() == true { panic("ChannelSpreader already started") @@ -37,6 +41,7 @@ func (cs *ChannelSpreader[T]) AddChannel(channel chan<- T) { cs.channels = append(cs.channels, channel) } +// Run combines the lists and starts fanning out the channel to the channels from the list func (cs *ChannelSpreader[T]) Run(ctx context.Context) error { cs.started.Store(true) From 9b4b98be0190c3592cdf0240e98cdcf5081575a8 Mon Sep 17 00:00:00 2001 From: Johannes Rauh Date: Tue, 28 Nov 2023 08:59:54 +0100 Subject: [PATCH 08/12] Use new Upsert and Delete structs --- pkg/sync/logs.go | 4 ++-- pkg/sync/sync.go | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/pkg/sync/logs.go b/pkg/sync/logs.go index 77d5cc4f..f1715efd 100644 --- a/pkg/sync/logs.go +++ b/pkg/sync/logs.go @@ -161,7 +161,7 @@ func (ls *LogSync) MaintainList(ctx context.Context, addChannel <-chan contracts }) g.Go(func() error { - return ls.db.DeleteStreamed(ctx, &schema.Log{}, deletes, database.ByColumn("reference_id")) + return database.NewDelete(ls.db).ByColumn("reference_id").Stream(ctx, &schema.Log{}, deletes) }) return g.Wait() @@ -244,7 +244,7 @@ func (ls *LogSync) Run(ctx context.Context) error { }) g.Go(func() error { - return ls.db.UpsertStreamed(ctx, upserts, database.WithStatement(upsertStmt, 5)) + return database.NewUpsert(ls.db).WithStatement(upsertStmt, 5).Stream(ctx, upserts) }) return g.Wait() diff --git a/pkg/sync/sync.go b/pkg/sync/sync.go index acecbf30..5c587010 100644 --- a/pkg/sync/sync.go +++ b/pkg/sync/sync.go @@ -171,7 +171,7 @@ func (s *sync) Run(ctx context.Context, execOptions ...SyncOption) error { } g.Go(func() error { - return s.db.UpsertStreamed(ctx, upsertToStream) + return database.NewUpsert(s.db).Stream(ctx, upsertToStream) }) // init delete channel spreader @@ -235,7 +235,7 @@ func (s *sync) Run(ctx context.Context, execOptions ...SyncOption) error { }) g.Go(func() error { - return s.db.DeleteStreamed(ctx, s.factory(), deleteToStream) + return database.NewDelete(s.db).Stream(ctx, s.factory(), deleteToStream) }) g.Go(func() error { From c67bf64ed774140988266666eedca00aa21b9867 Mon Sep 17 00:00:00 2001 From: Johannes Rauh Date: Tue, 28 Nov 2023 14:46:38 +0100 Subject: [PATCH 09/12] Fix review --- cmd/icinga-kubernetes/main.go | 38 +++++----------- go.mod | 11 ++--- go.sum | 23 +++++----- pkg/schema/container.go | 19 -------- pkg/sync/logs.go | 22 +++------ pkg/sync/spreader.go | 22 +++++---- pkg/sync/sync.go | 32 ++++++------- schema/mysql/schema.sql | 84 +++++++++++++++-------------------- 8 files changed, 97 insertions(+), 154 deletions(-) delete mode 100644 pkg/schema/container.go diff --git a/cmd/icinga-kubernetes/main.go b/cmd/icinga-kubernetes/main.go index 830ff08e..0c8a1bc1 100644 --- a/cmd/icinga-kubernetes/main.go +++ b/cmd/icinga-kubernetes/main.go @@ -72,57 +72,39 @@ func main() { g, ctx := errgroup.WithContext(ctx) forwardUpsertPodsChannel := make(chan database.Entity) - defer close(forwardUpsertPodsChannel) - forwardDeletePodsChannel := make(chan any) + defer close(forwardUpsertPodsChannel) defer close(forwardDeletePodsChannel) g.Go(func() error { return sync.NewSync( - db, - schema.NewNode, - informers.Core().V1().Nodes().Informer(), - logs.GetChildLogger("Nodes"), + db, schema.NewNode, informers.Core().V1().Nodes().Informer(), logs.GetChildLogger("Nodes"), ).Run(ctx) }) g.Go(func() error { return sync.NewSync( - db, - schema.NewNamespace, - informers.Core().V1().Namespaces().Informer(), - logs.GetChildLogger("Namespaces"), + db, schema.NewNamespace, informers.Core().V1().Namespaces().Informer(), logs.GetChildLogger("Namespaces"), ).Run(ctx) }) - forwardUpsertPodsToLogChannel := make(chan contracts.KUpsert) - forwardDeletePodsToLogChannel := make(chan contracts.KDelete) - + podUpserts := make(chan contracts.KUpsert) + podDeletes := make(chan contracts.KDelete) g.Go(func() error { - - defer close(forwardUpsertPodsToLogChannel) - defer close(forwardDeletePodsToLogChannel) + defer close(podUpserts) + defer close(podDeletes) return sync.NewSync( - db, - schema.NewPod, - informers.Core().V1().Pods().Informer(), - logs.GetChildLogger("Pods"), + db, schema.NewPod, informers.Core().V1().Pods().Informer(), logs.GetChildLogger("Pods"), ).Run( - ctx, - sync.WithForwardUpsertToLog(forwardUpsertPodsToLogChannel), - sync.WithForwardDeleteToLog(forwardDeletePodsToLogChannel), + ctx, sync.WithForwardUpserts(podUpserts), sync.WithForwardDeletes(podDeletes), ) }) logSync := sync.NewLogSync(k, db, logs.GetChildLogger("ContainerLogs")) g.Go(func() error { - return logSync.MaintainList(ctx, forwardUpsertPodsToLogChannel, forwardDeletePodsToLogChannel) - }) - - g.Go(func() error { - return logSync.Run(ctx) + return logSync.Run(ctx, podUpserts, podDeletes) }) if err := g.Wait(); err != nil { diff --git a/go.mod b/go.mod index 4fba6598..08bc7f87 100644 --- a/go.mod +++ b/go.mod @@ -8,8 +8,10 @@ require ( github.com/pkg/errors v0.9.1 go.uber.org/zap v1.26.0 golang.org/x/sync v0.5.0 - k8s.io/apimachinery v0.28.2 - k8s.io/client-go v0.28.2 + k8s.io/api v0.28.4 + k8s.io/apimachinery v0.28.4 + k8s.io/client-go v0.28.4 + k8s.io/metrics v0.28.4 ) require ( @@ -45,7 +47,7 @@ require ( github.com/ssgreg/journald v1.0.0 // indirect go.uber.org/multierr v1.11.0 // indirect golang.org/x/exp v0.0.0-20231110203233-9a3e6036ecaa // indirect - golang.org/x/net v0.16.0 // indirect + golang.org/x/net v0.17.0 // indirect golang.org/x/oauth2 v0.8.0 // indirect golang.org/x/sys v0.14.0 // indirect golang.org/x/term v0.13.0 // indirect @@ -53,11 +55,10 @@ require ( golang.org/x/time v0.3.0 // indirect golang.org/x/xerrors v0.0.0-20231012003039-104605ab7028 // indirect google.golang.org/appengine v1.6.7 // indirect - google.golang.org/protobuf v1.30.0 // indirect + google.golang.org/protobuf v1.31.0 // indirect gopkg.in/inf.v0 v0.9.1 // indirect gopkg.in/yaml.v2 v2.4.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect - k8s.io/api v0.28.2 // indirect k8s.io/klog/v2 v2.100.1 // indirect k8s.io/kube-openapi v0.0.0-20230717233707-2695361300d9 // indirect k8s.io/utils v0.0.0-20230406110748-d93618cff8a2 // indirect diff --git a/go.sum b/go.sum index a789518d..e173eff9 100644 --- a/go.sum +++ b/go.sum @@ -43,7 +43,6 @@ github.com/google/gofuzz v1.2.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/ github.com/google/pprof v0.0.0-20210720184732-4bb14d4b1be1 h1:K6RDEckDVWvDI9JAJYCmNdQXq6neHJOYx3V6jnqNEec= github.com/google/uuid v1.4.0 h1:MtMxsa51/r9yyhkyLsVeVt0B+BGQZzpQiTQ4eHZ8bc4= github.com/google/uuid v1.4.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= -github.com/icinga/icinga-go-library v0.0.0-20231121080432-c03a40718ed9 h1:C+BYgMhhitPxEt9pQ/O/ssQ90mIrm7+YG4KDe9UFFBA= github.com/icinga/icinga-go-library v0.0.0-20231121080432-c03a40718ed9/go.mod h1:Apo85zqPgovShDWxx/TlUN/bfl+RaPviTafT666iJyw= github.com/imdario/mergo v0.3.6 h1:xTNEAn+kxVO7dTZGu0CegyqKZmoWFI0rF8UxjlB2d28= github.com/imdario/mergo v0.3.6/go.mod h1:2EnlNZ0deacrJVfApfmtdGgDfMuh/nq6Ok1EcJh5FfA= @@ -124,8 +123,8 @@ golang.org/x/net v0.0.0-20190603091049-60506f45cf65/go.mod h1:HSz+uSET+XFnRR8LxR golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= -golang.org/x/net v0.16.0 h1:7eBu7KsSvFDtSXUIDbh3aqlK4DPsZ1rByC8PFfBThos= -golang.org/x/net v0.16.0/go.mod h1:NxSsAGuq816PNPmqtQdLE42eU2Fs7NoRIZrHJAlaCOE= +golang.org/x/net v0.17.0 h1:pVaXccu2ozPjCXewfr1S7xza/zcXTity9cCdXQYSjIM= +golang.org/x/net v0.17.0/go.mod h1:NxSsAGuq816PNPmqtQdLE42eU2Fs7NoRIZrHJAlaCOE= golang.org/x/oauth2 v0.8.0 h1:6dkIjl3j3LtZ/O3sTgZTMsLKSftL/B8Zgq4huOIIUu8= golang.org/x/oauth2 v0.8.0/go.mod h1:yr7u4HXZRm1R1kBWqr/xKNqewf0plRYoB7sla+BCIXE= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= @@ -165,8 +164,8 @@ google.golang.org/appengine v1.6.7 h1:FZR1q0exgwxzPzp/aF+VccGrSfxfPpkBqjIIEq3ru6 google.golang.org/appengine v1.6.7/go.mod h1:8WjMMxjGQR8xUklV/ARdw2HLXBOI7O7uCIDZVag1xfc= google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= -google.golang.org/protobuf v1.30.0 h1:kPPoIgf3TsEvrm0PFe15JQ+570QVxYzEvvHqChK+cng= -google.golang.org/protobuf v1.30.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I= +google.golang.org/protobuf v1.31.0 h1:g0LDEJHgrBl9N9r17Ru3sqWhkIx2NB67okBHPwC7hs8= +google.golang.org/protobuf v1.31.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= @@ -178,16 +177,18 @@ gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= -k8s.io/api v0.28.2 h1:9mpl5mOb6vXZvqbQmankOfPIGiudghwCoLl1EYfUZbw= -k8s.io/api v0.28.2/go.mod h1:RVnJBsjU8tcMq7C3iaRSGMeaKt2TWEUXcpIt/90fjEg= -k8s.io/apimachinery v0.28.2 h1:KCOJLrc6gu+wV1BYgwik4AF4vXOlVJPdiqn0yAWWwXQ= -k8s.io/apimachinery v0.28.2/go.mod h1:RdzF87y/ngqk9H4z3EL2Rppv5jj95vGS/HaFXrLDApU= -k8s.io/client-go v0.28.2 h1:DNoYI1vGq0slMBN/SWKMZMw0Rq+0EQW6/AK4v9+3VeY= -k8s.io/client-go v0.28.2/go.mod h1:sMkApowspLuc7omj1FOSUxSoqjr+d5Q0Yc0LOFnYFJY= +k8s.io/api v0.28.4 h1:8ZBrLjwosLl/NYgv1P7EQLqoO8MGQApnbgH8tu3BMzY= +k8s.io/api v0.28.4/go.mod h1:axWTGrY88s/5YE+JSt4uUi6NMM+gur1en2REMR7IRj0= +k8s.io/apimachinery v0.28.4 h1:zOSJe1mc+GxuMnFzD4Z/U1wst50X28ZNsn5bhgIIao8= +k8s.io/apimachinery v0.28.4/go.mod h1:wI37ncBvfAoswfq626yPTe6Bz1c22L7uaJ8dho83mgg= +k8s.io/client-go v0.28.4 h1:Np5ocjlZcTrkyRJ3+T3PkXDpe4UpatQxj85+xjaD2wY= +k8s.io/client-go v0.28.4/go.mod h1:0VDZFpgoZfelyP5Wqu0/r/TRYcLYuJ2U1KEeoaPa1N4= k8s.io/klog/v2 v2.100.1 h1:7WCHKK6K8fNhTqfBhISHQ97KrnJNFZMcQvKp7gP/tmg= k8s.io/klog/v2 v2.100.1/go.mod h1:y1WjHnz7Dj687irZUWR/WLkLc5N1YHtjLdmgWjndZn0= k8s.io/kube-openapi v0.0.0-20230717233707-2695361300d9 h1:LyMgNKD2P8Wn1iAwQU5OhxCKlKJy0sHc+PcDwFB24dQ= k8s.io/kube-openapi v0.0.0-20230717233707-2695361300d9/go.mod h1:wZK2AVp1uHCp4VamDVgBP2COHZjqD1T68Rf0CM3YjSM= +k8s.io/metrics v0.28.4 h1:u36fom9+6c8jX2sk8z58H0hFaIUfrPWbXIxN7GT2blk= +k8s.io/metrics v0.28.4/go.mod h1:bBqAJxH20c7wAsTQxDXOlVqxGMdce49d7WNr1WeaLac= k8s.io/utils v0.0.0-20230406110748-d93618cff8a2 h1:qY1Ad8PODbnymg2pRbkyMT/ylpTrCM8P2RJ0yroCyIk= k8s.io/utils v0.0.0-20230406110748-d93618cff8a2/go.mod h1:OLgZIPagt7ERELqWJFomSt595RzquPNLL48iOWgYOg0= sigs.k8s.io/json v0.0.0-20221116044647-bc3834ca7abd h1:EDPBXCAspyGV4jQlpZSudPeMmr1bNJefnuqLsRAsHZo= diff --git a/pkg/schema/container.go b/pkg/schema/container.go deleted file mode 100644 index cceefada..00000000 --- a/pkg/schema/container.go +++ /dev/null @@ -1,19 +0,0 @@ -package schema - -// -//import ( -// "github.com/icinga/icinga-kubernetes/pkg/contracts" -// kmetav1 "k8s.io/apimachinery/pkg/apis/meta/v1" -//) -// -//type Container struct { -// kmetaWithNamespace -//} -// -//func NewContainer() contracts.Resource { -// return &Container{} -//} -// -//func (c *Container) Obtain(kobject kmetav1.Object) { -// c.kmetaWithNamespace.Obtain(kobject) -//} diff --git a/pkg/sync/logs.go b/pkg/sync/logs.go index f1715efd..1018e65d 100644 --- a/pkg/sync/logs.go +++ b/pkg/sync/logs.go @@ -93,9 +93,7 @@ func (ls *LogSync) removeFromList(id database.ID) { out := make([]*kcorev1.Pod, 0) for _, element := range ls.list { - elementId := sha1.Sum([]byte(element.Namespace + "/" + element.Name)) - if fmt.Sprintf("%x", elementId) != id.String() { out = append(out, element) } @@ -105,10 +103,7 @@ func (ls *LogSync) removeFromList(id database.ID) { } // MaintainList adds pods from the addChannel to the list and deletes pods from the deleteChannel from the list -func (ls *LogSync) MaintainList(ctx context.Context, addChannel <-chan contracts.KUpsert, deleteChannel <-chan contracts.KDelete) error { - - ls.logger.Info("Starting maintain list") - +func (ls *LogSync) maintainList(ctx context.Context, upsertChannel <-chan contracts.KUpsert, deleteChannel <-chan contracts.KDelete) error { g, ctx := errgroup.WithContext(ctx) deletes := make(chan any) @@ -120,13 +115,12 @@ func (ls *LogSync) MaintainList(ctx context.Context, addChannel <-chan contracts case <-ctx.Done(): return errors.Wrap(ctx.Err(), "context canceled maintain log sync list") - case podFromChannel, more := <-addChannel: + case podFromChannel, more := <-upsertChannel: if !more { return nil } pod := podFromChannel.KObject().(*kcorev1.Pod) - podIsInList := false for _, listPod := range ls.list { @@ -169,27 +163,25 @@ func (ls *LogSync) MaintainList(ctx context.Context, addChannel <-chan contracts // Run starts syncing the logs to the database. Therefore, it loops over all // containers of each pod in the maintained list every 15 seconds. -func (ls *LogSync) Run(ctx context.Context) error { - +func (ls *LogSync) Run(ctx context.Context, upsertChannel <-chan contracts.KUpsert, deleteChannel <-chan contracts.KDelete) error { ls.logger.Info("Starting sync") g, ctx := errgroup.WithContext(ctx) - upsertStmt := ls.upsertStmt() + g.Go(func() error { + return ls.maintainList(ctx, upsertChannel, deleteChannel) + }) + upsertStmt := ls.upsertStmt() upserts := make(chan database.Entity) defer close(upserts) g.Go(func() error { for { for _, pod := range ls.list { - curPodId := sha1.Sum([]byte(pod.Namespace + "/" + pod.Name)) - for _, container := range pod.Spec.Containers { - curContainerId := sha1.Sum([]byte(pod.Namespace + "/" + pod.Name + "/" + container.Name)) - podLogOpts := kcorev1.PodLogOptions{Container: container.Name, Timestamps: true} if ls.lastChecked[curContainerId] != nil { diff --git a/pkg/sync/spreader.go b/pkg/sync/spreader.go index 2115550a..c0577365 100644 --- a/pkg/sync/spreader.go +++ b/pkg/sync/spreader.go @@ -5,11 +5,11 @@ import ( "sync/atomic" ) -// ChannelSpreader takes a channel of type T and fans it out to an array of other channels of type T +// ChannelSpreader takes a channel of type T and fans it out to an array of other addedChannels of type T type ChannelSpreader[T any] struct { channelToBreak <-chan T createdChannels []chan<- T - channels []chan<- T + addedChannels []chan<- T started atomic.Bool } @@ -20,9 +20,9 @@ func NewChannelSpreader[T any](channelToBreak <-chan T) *ChannelSpreader[T] { } } -// NewChannel returns and adds new output channel to the list of created channels +// NewChannel returns and adds new output channel to the list of created addedChannels func (cs *ChannelSpreader[T]) NewChannel() <-chan T { - if cs.started.Load() == true { + if cs.started.Load() { panic("ChannelSpreader already started") } @@ -32,18 +32,17 @@ func (cs *ChannelSpreader[T]) NewChannel() <-chan T { return channel } -// AddChannel adds given output channel to the list of added channels +// AddChannel adds given output channel to the list of added addedChannels func (cs *ChannelSpreader[T]) AddChannel(channel chan<- T) { - if cs.started.Load() == true { + if cs.started.Load() { panic("ChannelSpreader already started") } - cs.channels = append(cs.channels, channel) + cs.addedChannels = append(cs.addedChannels, channel) } -// Run combines the lists and starts fanning out the channel to the channels from the list +// Run combines the lists and starts fanning out the channel to the addedChannels from the list func (cs *ChannelSpreader[T]) Run(ctx context.Context) error { - cs.started.Store(true) defer func() { @@ -52,7 +51,7 @@ func (cs *ChannelSpreader[T]) Run(ctx context.Context) error { } }() - cs.channels = append(cs.channels, cs.createdChannels...) + channels := append(cs.addedChannels, cs.createdChannels...) for { select { @@ -61,14 +60,13 @@ func (cs *ChannelSpreader[T]) Run(ctx context.Context) error { return nil } - for _, channel := range cs.channels { + for _, channel := range channels { select { case channel <- spread: case <-ctx.Done(): return ctx.Err() } } - case <-ctx.Done(): return ctx.Err() } diff --git a/pkg/sync/sync.go b/pkg/sync/sync.go index 5c587010..d3c2b6be 100644 --- a/pkg/sync/sync.go +++ b/pkg/sync/sync.go @@ -41,33 +41,33 @@ func NewSync( return s } -func WithForwardUpsertToLog(channel chan<- contracts.KUpsert) SyncOption { +func WithForwardUpserts(channel chan<- contracts.KUpsert) SyncOption { return func(options *SyncOptions) { - options.forwardUpsertToLogChannel = channel + options.forwardUpserts = channel } } -func WithForwardDeleteToLog(channel chan<- contracts.KDelete) SyncOption { +func WithForwardDeletes(channel chan<- contracts.KDelete) SyncOption { return func(options *SyncOptions) { - options.forwardDeleteToLogChannel = channel + options.forwardDeletes = channel } } type SyncOption func(options *SyncOptions) type SyncOptions struct { - forwardUpsertToLogChannel chan<- contracts.KUpsert - forwardDeleteToLogChannel chan<- contracts.KDelete + forwardUpserts chan<- contracts.KUpsert + forwardDeletes chan<- contracts.KDelete } -func NewOptionStorage(execOptions ...SyncOption) *SyncOptions { - optionStorage := &SyncOptions{} +func NewSyncOptions(options ...SyncOption) *SyncOptions { + syncOptions := &SyncOptions{} - for _, option := range execOptions { - option(optionStorage) + for _, option := range options { + option(syncOptions) } - return optionStorage + return syncOptions } func (s *sync) Run(ctx context.Context, execOptions ...SyncOption) error { @@ -98,7 +98,7 @@ func (s *sync) Run(ctx context.Context, execOptions ...SyncOption) error { s.factory().GetResourceVersion() - syncOptions := NewOptionStorage(execOptions...) + syncOptions := NewSyncOptions(execOptions...) // init upsert channel spreader multiplexUpsertChannel := make(chan contracts.KUpsert) @@ -108,8 +108,8 @@ func (s *sync) Run(ctx context.Context, execOptions ...SyncOption) error { upsertChannel := multiplexUpsert.NewChannel() - if syncOptions.forwardUpsertToLogChannel != nil { - multiplexUpsert.AddChannel(syncOptions.forwardUpsertToLogChannel) + if syncOptions.forwardUpserts != nil { + multiplexUpsert.AddChannel(syncOptions.forwardUpserts) } // run upsert channel spreader @@ -182,8 +182,8 @@ func (s *sync) Run(ctx context.Context, execOptions ...SyncOption) error { deleteChannel := multiplexDelete.NewChannel() - if syncOptions.forwardDeleteToLogChannel != nil { - multiplexDelete.AddChannel(syncOptions.forwardDeleteToLogChannel) + if syncOptions.forwardDeletes != nil { + multiplexDelete.AddChannel(syncOptions.forwardDeletes) } // run delete channel spreader diff --git a/schema/mysql/schema.sql b/schema/mysql/schema.sql index 93aebbb1..f5ca1400 100644 --- a/schema/mysql/schema.sql +++ b/schema/mysql/schema.sql @@ -1,51 +1,39 @@ -CREATE TABLE node -( - id BINARY(20) NOT NULL, - canonical_name VARCHAR(253) COLLATE utf8mb4_unicode_ci NOT NULL, - name VARCHAR(253) COLLATE utf8mb4_unicode_ci NOT NULL, - uid VARCHAR(255) COLLATE utf8mb4_unicode_ci NOT NULL, - resource_version VARCHAR(255) NOT NULL, - created BIGINT UNSIGNED NOT NULL, - PRIMARY KEY (id) -) ENGINE = InnoDB - DEFAULT CHARSET = utf8mb4 - COLLATE = utf8mb4_bin; +CREATE TABLE node ( + id binary(20) NOT NULL, + canonical_name varchar(253) COLLATE utf8mb4_unicode_ci NOT NULL, + name varchar(253) COLLATE utf8mb4_unicode_ci NOT NULL, + uid varchar(255) COLLATE utf8mb4_unicode_ci NOT NULL, + resource_version varchar(255) NOT NULL, + created bigint unsigned NOT NULL, + PRIMARY KEY (id) +) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin; -CREATE TABLE namespace -( - id BINARY(20) NOT NULL, - canonical_name VARCHAR(63) COLLATE utf8mb4_unicode_ci NOT NULL, - name VARCHAR(63) COLLATE utf8mb4_unicode_ci NOT NULL, - uid VARCHAR(255) COLLATE utf8mb4_unicode_ci NOT NULL, - resource_version VARCHAR(255) NOT NULL, - created BIGINT UNSIGNED NOT NULL, - PRIMARY KEY (id) -) ENGINE = InnoDB - DEFAULT CHARSET = utf8mb4 - COLLATE = utf8mb4_bin; +CREATE TABLE namespace ( + id binary(20) NOT NULL, + canonical_name varchar(63) COLLATE utf8mb4_unicode_ci NOT NULL, + name varchar(63) COLLATE utf8mb4_unicode_ci NOT NULL, + uid varchar(255) COLLATE utf8mb4_unicode_ci NOT NULL, + resource_version varchar(255) NOT NULL, + created bigint unsigned NOT NULL, + PRIMARY KEY (id) +) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin; -CREATE TABLE pod -( - id BINARY(20) NOT NULL, - canonical_name VARCHAR(317) COLLATE utf8mb4_unicode_ci NOT NULL COMMENT 'namespace/name', - namespace VARCHAR(63) COLLATE utf8mb4_unicode_ci NOT NULL, - name VARCHAR(253) COLLATE utf8mb4_unicode_ci NOT NULL, - uid VARCHAR(255) COLLATE utf8mb4_unicode_ci NOT NULL, - resource_version VARCHAR(255) NOT NULL, - created BIGINT UNSIGNED NOT NULL, - PRIMARY KEY (id) -) ENGINE = InnoDB - DEFAULT CHARSET = utf8mb4 - COLLATE = utf8mb4_bin; +CREATE TABLE pod ( + id binary(20) NOT NULL, + canonical_name varchar(317) COLLATE utf8mb4_unicode_ci NOT NULL COMMENT 'namespace/name', + namespace varchar(63) COLLATE utf8mb4_unicode_ci NOT NULL, + name varchar(253) COLLATE utf8mb4_unicode_ci NOT NULL, + uid varchar(255) COLLATE utf8mb4_unicode_ci NOT NULL, + resource_version varchar(255) NOT NULL, + created bigint unsigned NOT NULL, + PRIMARY KEY (id) +) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin; -CREATE TABLE log -( - id BINARY(20) NOT NULL, - reference_id BINARY(20) NOT NULL, - container_name VARCHAR(255) COLLATE utf8mb4_unicode_ci NOT NULL, - time LONGTEXT NOT NULL, - log LONGTEXT NOT NULL, - PRIMARY KEY (id) -) ENGINE = InnoDB - DEFAULT CHARSET = utf8mb4 - COLLATE = utf8mb4_bin; +CREATE TABLE log ( + id binary(20) NOT NULL, + reference_id binary(20) NOT NULL, + container_name varchar(255) COLLATE utf8mb4_unicode_ci NOT NULL, + time longtext NOT NULL, + log longtext NOT NULL, + PRIMARY KEY (id) +) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin; From bbed6169bac4e2613f83fa833c82b6ef526b2525 Mon Sep 17 00:00:00 2001 From: Johannes Rauh Date: Wed, 29 Nov 2023 12:09:08 +0100 Subject: [PATCH 10/12] Fix review --- cmd/icinga-kubernetes/main.go | 7 +- pkg/schema/log.go | 13 +-- pkg/sync/channel-mux.go | 97 +++++++++++++++++++++ pkg/sync/logs.go | 156 +++++++++++++++++----------------- pkg/sync/spreader.go | 74 ---------------- pkg/sync/sync.go | 12 +-- schema/mysql/schema.sql | 9 +- 7 files changed, 192 insertions(+), 176 deletions(-) create mode 100644 pkg/sync/channel-mux.go delete mode 100644 pkg/sync/spreader.go diff --git a/cmd/icinga-kubernetes/main.go b/cmd/icinga-kubernetes/main.go index 0c8a1bc1..9ac8e6bd 100644 --- a/cmd/icinga-kubernetes/main.go +++ b/cmd/icinga-kubernetes/main.go @@ -71,11 +71,6 @@ func main() { g, ctx := errgroup.WithContext(ctx) - forwardUpsertPodsChannel := make(chan database.Entity) - forwardDeletePodsChannel := make(chan any) - defer close(forwardUpsertPodsChannel) - defer close(forwardDeletePodsChannel) - g.Go(func() error { return sync.NewSync( db, schema.NewNode, informers.Core().V1().Nodes().Informer(), logs.GetChildLogger("Nodes"), @@ -101,7 +96,7 @@ func main() { ) }) - logSync := sync.NewLogSync(k, db, logs.GetChildLogger("ContainerLogs")) + logSync := sync.NewContainerLogSync(k, db, logs.GetChildLogger("ContainerLogs")) g.Go(func() error { return logSync.Run(ctx, podUpserts, podDeletes) diff --git a/pkg/schema/log.go b/pkg/schema/log.go index 1ed69925..1f3b7abd 100644 --- a/pkg/schema/log.go +++ b/pkg/schema/log.go @@ -1,10 +1,11 @@ package schema -type Log struct { +import "github.com/icinga/icinga-go-library/types" + +type ContainerLog struct { kmetaWithoutNamespace - Id []byte - ReferenceId []byte - ContainerName string - Time string - Log string + ContainerId types.Binary + PodId types.Binary + Time string + Log string } diff --git a/pkg/sync/channel-mux.go b/pkg/sync/channel-mux.go new file mode 100644 index 00000000..d2f9ceda --- /dev/null +++ b/pkg/sync/channel-mux.go @@ -0,0 +1,97 @@ +package sync + +import ( + "context" + "sync/atomic" +) + +// ChannelMux is a multiplexer for channels of variable types. +// It fans all input channels to all output channels. +type ChannelMux[T any] interface { + + // AddInChannel adds given input channel to the list of input channels. + AddInChannel(<-chan T) + + // NewOutChannel returns and adds new output channel to the pods of created addedOutChannels. + NewOutChannel() <-chan T + + // AddOutChannel adds given output channel to the list of added addedOutChannels. + AddOutChannel(chan<- T) + + // Run combines output channel lists and starts multiplexing. + Run(context.Context) error +} + +type channelMux[T any] struct { + inChannels []<-chan T + createdOutChannels []chan<- T + addedOutChannels []chan<- T + started atomic.Bool +} + +// NewChannelMux creates new ChannelMux initialized with at least one input channel +func NewChannelMux[T any](initInChannel <-chan T, inChannels ...<-chan T) ChannelMux[T] { + return &channelMux[T]{ + inChannels: append(make([]<-chan T, 0), append(inChannels, initInChannel)...), + } +} + +func (mux *channelMux[T]) AddInChannel(channel <-chan T) { + if mux.started.Load() { + panic("channelMux already started") + } + + mux.inChannels = append(mux.inChannels, channel) +} + +func (mux *channelMux[T]) NewOutChannel() <-chan T { + if mux.started.Load() { + panic("channelMux already started") + } + + channel := make(chan T) + mux.createdOutChannels = append(mux.createdOutChannels, channel) + + return channel +} + +func (mux *channelMux[T]) AddOutChannel(channel chan<- T) { + if mux.started.Load() { + panic("channelMux already started") + } + + mux.addedOutChannels = append(mux.addedOutChannels, channel) +} + +func (mux *channelMux[T]) Run(ctx context.Context) error { + mux.started.Store(true) + + defer func() { + for _, channelToClose := range mux.createdOutChannels { + close(channelToClose) + } + }() + + outChannels := append(mux.addedOutChannels, mux.createdOutChannels...) + + for { + for _, inChannel := range mux.inChannels { + select { + case spread, more := <-inChannel: + if !more { + return nil + } + + for _, outChannel := range outChannels { + select { + case outChannel <- spread: + case <-ctx.Done(): + return ctx.Err() + } + } + case <-ctx.Done(): + return ctx.Err() + } + } + } +} diff --git a/pkg/sync/logs.go b/pkg/sync/logs.go index 1018e65d..ef0935d5 100644 --- a/pkg/sync/logs.go +++ b/pkg/sync/logs.go @@ -7,6 +7,7 @@ import ( "fmt" "github.com/icinga/icinga-go-library/database" "github.com/icinga/icinga-go-library/logging" + "github.com/icinga/icinga-go-library/types" "github.com/icinga/icinga-kubernetes/pkg/contracts" "github.com/icinga/icinga-kubernetes/pkg/schema" "github.com/pkg/errors" @@ -15,50 +16,58 @@ import ( kcorev1 "k8s.io/api/core/v1" kmetav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/kubernetes" - "slices" "strings" msync "sync" "time" ) -// LogSync syncs logs to database. Therefore, it maintains a list -// of pod elements to get logs from -type LogSync struct { - list []*kcorev1.Pod - lastChecked map[[20]byte]*kmetav1.Time - mutex *msync.RWMutex - clientset *kubernetes.Clientset - db *database.DB - logger *logging.Logger +type podListItem struct { + pod *kcorev1.Pod + lastTimestamps map[[20]byte]*kmetav1.Time } -// NewLogSync creates new LogSync initialized with clientset, database and logger -func NewLogSync(clientset *kubernetes.Clientset, db *database.DB, logger *logging.Logger) *LogSync { - return &LogSync{ - list: []*kcorev1.Pod{}, - lastChecked: make(map[[20]byte]*kmetav1.Time), - mutex: &msync.RWMutex{}, - clientset: clientset, - db: db, - logger: logger, +// ContainerLogSync reacts to pod changes and syncs container logs to database. +// On pod add/updates ContainerLogSync starts syncing. On pod deletes syncing stops. +// Container logs are periodic fetched from Kubernetes API. +type ContainerLogSync interface { + // Run starts the ContainerLogSync. + Run(context.Context, <-chan contracts.KUpsert, <-chan contracts.KDelete) error +} + +// containerLogSync syncs container logs to database. +type containerLogSync struct { + pods map[[20]byte]podListItem + mutex *msync.RWMutex + clientset *kubernetes.Clientset + db *database.DB + logger *logging.Logger +} + +// NewContainerLogSync creates new containerLogSync initialized with clientset, database and logger. +func NewContainerLogSync(clientset *kubernetes.Clientset, db *database.DB, logger *logging.Logger) ContainerLogSync { + return &containerLogSync{ + pods: make(map[[20]byte]podListItem), + mutex: &msync.RWMutex{}, + clientset: clientset, + db: db, + logger: logger, } } -// upsertStmt returns database upsert statement -func (ls *LogSync) upsertStmt() string { +// upsertStmt returns a database statement to upsert a container log. +func (ls *containerLogSync) upsertStmt() string { return fmt.Sprintf( "INSERT INTO %s (%s) VALUES (%s) ON DUPLICATE KEY UPDATE %s", - "log", - "id, reference_id, container_name, time, log", - ":id, :reference_id, :container_name, :time, :log", + "container_log", + "container_id, pod_id, time, log", + ":container_id, :pod_id, :time, :log", "time=CONCAT(time, '\n', :time), log=CONCAT(log, '\n', :log)", ) } -// splitTimestampsFromMessages takes a log as []byte and returns timestamps and messages as separate string slices. -// Additionally, it updates the last checked timestamp for the log -func (ls *LogSync) splitTimestampsFromMessages(log []byte, curContainerId [20]byte) (times []string, messages []string, err error) { - +// splitTimestampsFromMessages takes a log and returns timestamps and messages as separate parts. +// Additionally, it updates the last checked timestamp for the container log. +func (ls *containerLogSync) splitTimestampsFromMessages(log types.Binary, curPodId [20]byte, curContainerId [20]byte) (times []string, messages []string, err error) { stringReader := strings.NewReader(string(log)) reader := bufio.NewReader(stringReader) @@ -77,7 +86,7 @@ func (ls *LogSync) splitTimestampsFromMessages(log []byte, curContainerId [20]by continue } - if ls.lastChecked[curContainerId] != nil && messageTime.UnixNano() <= ls.lastChecked[curContainerId].UnixNano() { + if ls.pods[curPodId].lastTimestamps[curContainerId] != nil && messageTime.UnixNano() <= ls.pods[curPodId].lastTimestamps[curContainerId].UnixNano() { continue } @@ -88,22 +97,8 @@ func (ls *LogSync) splitTimestampsFromMessages(log []byte, curContainerId [20]by return times, messages, nil } -// removeFromList removes pod from maintained list -func (ls *LogSync) removeFromList(id database.ID) { - out := make([]*kcorev1.Pod, 0) - - for _, element := range ls.list { - elementId := sha1.Sum([]byte(element.Namespace + "/" + element.Name)) - if fmt.Sprintf("%x", elementId) != id.String() { - out = append(out, element) - } - } - - ls.list = out -} - -// MaintainList adds pods from the addChannel to the list and deletes pods from the deleteChannel from the list -func (ls *LogSync) maintainList(ctx context.Context, upsertChannel <-chan contracts.KUpsert, deleteChannel <-chan contracts.KDelete) error { +// maintainList updates pods depending on the objects coming in via upsert and delete channel. +func (ls *containerLogSync) maintainList(ctx context.Context, upsertChannel <-chan contracts.KUpsert, deleteChannel <-chan contracts.KDelete) error { g, ctx := errgroup.WithContext(ctx) deletes := make(chan any) @@ -113,7 +108,7 @@ func (ls *LogSync) maintainList(ctx context.Context, upsertChannel <-chan contra for { select { case <-ctx.Done(): - return errors.Wrap(ctx.Err(), "context canceled maintain log sync list") + return errors.Wrap(ctx.Err(), "context canceled maintain log sync pods") case podFromChannel, more := <-upsertChannel: if !more { @@ -121,20 +116,16 @@ func (ls *LogSync) maintainList(ctx context.Context, upsertChannel <-chan contra } pod := podFromChannel.KObject().(*kcorev1.Pod) - podIsInList := false + podId := sha1.Sum(types.Checksum(podFromChannel.ID().String())) - for _, listPod := range ls.list { - if listPod.UID == pod.UID { - podIsInList = true - } - } + _, ok := ls.pods[podId] - if podIsInList { + if ok { continue } ls.mutex.RLock() - ls.list = append(ls.list, pod) + ls.pods[podId] = podListItem{pod: pod} ls.mutex.RUnlock() case podIdFromChannel, more := <-deleteChannel: @@ -142,28 +133,30 @@ func (ls *LogSync) maintainList(ctx context.Context, upsertChannel <-chan contra return nil } - idOfPod := podIdFromChannel.ID() + podId := sha1.Sum(types.Checksum(podIdFromChannel.ID().String())) ls.mutex.RLock() - ls.removeFromList(idOfPod) + delete(ls.pods, podId) ls.mutex.RUnlock() - deletes <- idOfPod + select { + case deletes <- podId: + case <-ctx.Done(): + return ctx.Err() + } } } }) g.Go(func() error { - return database.NewDelete(ls.db).ByColumn("reference_id").Stream(ctx, &schema.Log{}, deletes) + return database.NewDelete(ls.db).ByColumn("container_id").Stream(ctx, &schema.ContainerLog{}, deletes) }) return g.Wait() } -// Run starts syncing the logs to the database. Therefore, it loops over all -// containers of each pod in the maintained list every 15 seconds. -func (ls *LogSync) Run(ctx context.Context, upsertChannel <-chan contracts.KUpsert, deleteChannel <-chan contracts.KDelete) error { +func (ls *containerLogSync) Run(ctx context.Context, upsertChannel <-chan contracts.KUpsert, deleteChannel <-chan contracts.KDelete) error { ls.logger.Info("Starting sync") g, ctx := errgroup.WithContext(ctx) @@ -178,23 +171,23 @@ func (ls *LogSync) Run(ctx context.Context, upsertChannel <-chan contracts.KUpse g.Go(func() error { for { - for _, pod := range ls.list { - curPodId := sha1.Sum([]byte(pod.Namespace + "/" + pod.Name)) - for _, container := range pod.Spec.Containers { - curContainerId := sha1.Sum([]byte(pod.Namespace + "/" + pod.Name + "/" + container.Name)) + for _, element := range ls.pods { + podId := sha1.Sum(types.Checksum(element.pod.Namespace + "/" + element.pod.Name)) + for _, container := range element.pod.Spec.Containers { + containerId := sha1.Sum(types.Checksum(element.pod.Namespace + "/" + element.pod.Name + "/" + container.Name)) podLogOpts := kcorev1.PodLogOptions{Container: container.Name, Timestamps: true} - if ls.lastChecked[curContainerId] != nil { - podLogOpts.SinceTime = ls.lastChecked[curContainerId] + if ls.pods[podId].lastTimestamps != nil { + podLogOpts.SinceTime = ls.pods[podId].lastTimestamps[containerId] } - log, err := ls.clientset.CoreV1().Pods(pod.Namespace).GetLogs(pod.Name, &podLogOpts).Do(ctx).Raw() + log, err := ls.clientset.CoreV1().Pods(element.pod.Namespace).GetLogs(element.pod.Name, &podLogOpts).Do(ctx).Raw() if err != nil { fmt.Println(errors.Wrap(err, "error reading container log")) continue } - times, messages, err := ls.splitTimestampsFromMessages(log, curContainerId) + times, messages, err := ls.splitTimestampsFromMessages(log, podId, containerId) if err != nil { return err } @@ -203,27 +196,32 @@ func (ls *LogSync) Run(ctx context.Context, upsertChannel <-chan contracts.KUpse continue } - newLog := &schema.Log{ - Id: curContainerId[:], - ReferenceId: curPodId[:], - ContainerName: container.Name, - Time: strings.Join(times, "\n"), - Log: strings.Join(messages, "\n"), + newLog := &schema.ContainerLog{ + ContainerId: containerId[:], + PodId: podId[:], + Time: strings.Join(times, "\n"), + Log: strings.Join(messages, "\n"), } - upserts <- newLog + select { + case upserts <- newLog: + case <-ctx.Done(): + return ctx.Err() + + } lastTime, err := time.Parse("2006-01-02T15:04:05.999999999Z", times[len(times)-1]) if err != nil { return errors.Wrap(err, "error parsing log time") } - if !slices.Contains(ls.list, pod) { + lastV1Time := kmetav1.Time{Time: lastTime} + + if _, ok := ls.pods[podId]; !ok { continue } - lastV1Time := kmetav1.Time{Time: lastTime} - ls.lastChecked[curContainerId] = &lastV1Time + ls.pods[podId].lastTimestamps[containerId] = &lastV1Time } } diff --git a/pkg/sync/spreader.go b/pkg/sync/spreader.go deleted file mode 100644 index c0577365..00000000 --- a/pkg/sync/spreader.go +++ /dev/null @@ -1,74 +0,0 @@ -package sync - -import ( - "context" - "sync/atomic" -) - -// ChannelSpreader takes a channel of type T and fans it out to an array of other addedChannels of type T -type ChannelSpreader[T any] struct { - channelToBreak <-chan T - createdChannels []chan<- T - addedChannels []chan<- T - started atomic.Bool -} - -// NewChannelSpreader creates new ChannelSpreader initialized with the channel to break -func NewChannelSpreader[T any](channelToBreak <-chan T) *ChannelSpreader[T] { - return &ChannelSpreader[T]{ - channelToBreak: channelToBreak, - } -} - -// NewChannel returns and adds new output channel to the list of created addedChannels -func (cs *ChannelSpreader[T]) NewChannel() <-chan T { - if cs.started.Load() { - panic("ChannelSpreader already started") - } - - channel := make(chan T) - cs.createdChannels = append(cs.createdChannels, channel) - - return channel -} - -// AddChannel adds given output channel to the list of added addedChannels -func (cs *ChannelSpreader[T]) AddChannel(channel chan<- T) { - if cs.started.Load() { - panic("ChannelSpreader already started") - } - - cs.addedChannels = append(cs.addedChannels, channel) -} - -// Run combines the lists and starts fanning out the channel to the addedChannels from the list -func (cs *ChannelSpreader[T]) Run(ctx context.Context) error { - cs.started.Store(true) - - defer func() { - for _, channelToClose := range cs.createdChannels { - close(channelToClose) - } - }() - - channels := append(cs.addedChannels, cs.createdChannels...) - - for { - select { - case spread, more := <-cs.channelToBreak: - if !more { - return nil - } - - for _, channel := range channels { - select { - case channel <- spread: - case <-ctx.Done(): - return ctx.Err() - } - } - case <-ctx.Done(): - return ctx.Err() - } - } -} diff --git a/pkg/sync/sync.go b/pkg/sync/sync.go index d3c2b6be..e04b5125 100644 --- a/pkg/sync/sync.go +++ b/pkg/sync/sync.go @@ -104,12 +104,12 @@ func (s *sync) Run(ctx context.Context, execOptions ...SyncOption) error { multiplexUpsertChannel := make(chan contracts.KUpsert) defer close(multiplexUpsertChannel) - multiplexUpsert := NewChannelSpreader[contracts.KUpsert](multiplexUpsertChannel) + multiplexUpsert := NewChannelMux(multiplexUpsertChannel) - upsertChannel := multiplexUpsert.NewChannel() + upsertChannel := multiplexUpsert.NewOutChannel() if syncOptions.forwardUpserts != nil { - multiplexUpsert.AddChannel(syncOptions.forwardUpserts) + multiplexUpsert.AddOutChannel(syncOptions.forwardUpserts) } // run upsert channel spreader @@ -178,12 +178,12 @@ func (s *sync) Run(ctx context.Context, execOptions ...SyncOption) error { multiplexDeleteChannel := make(chan contracts.KDelete) defer close(multiplexDeleteChannel) - multiplexDelete := NewChannelSpreader[contracts.KDelete](multiplexDeleteChannel) + multiplexDelete := NewChannelMux(multiplexDeleteChannel) - deleteChannel := multiplexDelete.NewChannel() + deleteChannel := multiplexDelete.NewOutChannel() if syncOptions.forwardDeletes != nil { - multiplexDelete.AddChannel(syncOptions.forwardDeletes) + multiplexDelete.AddOutChannel(syncOptions.forwardDeletes) } // run delete channel spreader diff --git a/schema/mysql/schema.sql b/schema/mysql/schema.sql index f5ca1400..e6a90f42 100644 --- a/schema/mysql/schema.sql +++ b/schema/mysql/schema.sql @@ -29,11 +29,10 @@ CREATE TABLE pod ( PRIMARY KEY (id) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin; -CREATE TABLE log ( - id binary(20) NOT NULL, - reference_id binary(20) NOT NULL, - container_name varchar(255) COLLATE utf8mb4_unicode_ci NOT NULL, +CREATE TABLE container_log ( + container_id binary(20) NOT NULL, + pod_id binary(20) NOT NULL, time longtext NOT NULL, log longtext NOT NULL, - PRIMARY KEY (id) + PRIMARY KEY (container_id) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin; From 90a7bb3e077eb3db52531a911d8b72f83adeed02 Mon Sep 17 00:00:00 2001 From: Johannes Rauh Date: Tue, 28 Nov 2023 16:16:04 +0100 Subject: [PATCH 11/12] WIP: Unit tests for channel multiplexer --- pkg/sync/channel-mux.go | 33 +++++- pkg/sync/channel-mux_test.go | 207 +++++++++++++++++++++++++++++++++++ 2 files changed, 236 insertions(+), 4 deletions(-) create mode 100644 pkg/sync/channel-mux_test.go diff --git a/pkg/sync/channel-mux.go b/pkg/sync/channel-mux.go index d2f9ceda..69635ba9 100644 --- a/pkg/sync/channel-mux.go +++ b/pkg/sync/channel-mux.go @@ -2,6 +2,7 @@ package sync import ( "context" + "golang.org/x/sync/errgroup" "sync/atomic" ) @@ -74,10 +75,32 @@ func (mux *channelMux[T]) Run(ctx context.Context) error { outChannels := append(mux.addedOutChannels, mux.createdOutChannels...) - for { - for _, inChannel := range mux.inChannels { + sink := make(chan T) + + g, ctx := errgroup.WithContext(ctx) + + for _, ch := range mux.inChannels { + ch := ch + + g.Go(func() error { + for { + select { + case spread, more := <-ch: + if !more { + return nil + } + sink <- spread + case <-ctx.Done(): + return ctx.Err() + } + } + }) + } + + g.Go(func() error { + for { select { - case spread, more := <-inChannel: + case spread, more := <-sink: if !more { return nil } @@ -93,5 +116,7 @@ func (mux *channelMux[T]) Run(ctx context.Context) error { return ctx.Err() } } - } + }) + + return g.Wait() } diff --git a/pkg/sync/channel-mux_test.go b/pkg/sync/channel-mux_test.go new file mode 100644 index 00000000..37e90b6e --- /dev/null +++ b/pkg/sync/channel-mux_test.go @@ -0,0 +1,207 @@ +package sync + +import ( + "context" + "golang.org/x/sync/errgroup" + "testing" + "time" +) + +type outputTest struct { + arg1, want int +} + +var outputTests = []outputTest{ + {0, 0}, + {5, 5}, + {35253, 35253}, + {999999, 999999}, + {-7, -7}, +} + +func TestAddedOutputChannels(t *testing.T) { + for _, test := range outputTests { + multiplexChannel := make(chan int) + multiplexer := NewChannelMux(multiplexChannel) + + outputChannel1 := make(chan int) + outputChannel2 := make(chan int) + outputChannel3 := make(chan int) + multiplexer.AddOutChannel(outputChannel1) + multiplexer.AddOutChannel(outputChannel2) + multiplexer.AddOutChannel(outputChannel3) + + g, ctx := errgroup.WithContext(context.Background()) + + g.Go(func() error { + return multiplexer.Run(ctx) + }) + + multiplexChannel <- test.arg1 + + if got := <-outputChannel1; got != test.want { + t.Errorf("got '%d' for 1st test channel, wanted '%d'", got, test.want) + } + if got := <-outputChannel2; got != test.want { + t.Errorf("got '%d' for 2nd test channel, wanted '%d'", got, test.want) + } + if got := <-outputChannel3; got != test.want { + t.Errorf("got '%d' for 3rd test channel, wanted '%d'", got, test.want) + } + } +} + +func TestCreatedOutputChannels(t *testing.T) { + for _, test := range outputTests { + multiplexChannel := make(chan int) + multiplexer := NewChannelMux(multiplexChannel) + + outputChannel1 := multiplexer.NewOutChannel() + outputChannel2 := multiplexer.NewOutChannel() + outputChannel3 := multiplexer.NewOutChannel() + + g, ctx := errgroup.WithContext(context.Background()) + + g.Go(func() error { + return multiplexer.Run(ctx) + }) + + multiplexChannel <- test.arg1 + + if got := <-outputChannel1; got != test.want { + t.Errorf("got '%d' for 1st test channel, wanted '%d'", got, test.want) + } + if got := <-outputChannel2; got != test.want { + t.Errorf("got '%d' for 2nd test channel, wanted '%d'", got, test.want) + } + if got := <-outputChannel3; got != test.want { + t.Errorf("got '%d' for 3rd test channel, wanted '%d'", got, test.want) + } + } +} + +type inputTest struct { + arg1, arg2, arg3, want int +} + +var inputTests = []inputTest{ + {0, 0, 0, 0}, + {1, 2, 3, 6}, + {535, 64, 6432, 7031}, + {353632, 636232, 64674, 1054538}, + {-1, -2, -3, -6}, +} + +func TestAddedInputChannels(t *testing.T) { + for _, test := range inputTests { + multiplexChannel1 := make(chan int) + multiplexChannel2 := make(chan int) + multiplexChannel3 := make(chan int) + + multiplexer := NewChannelMux(multiplexChannel1, multiplexChannel2, multiplexChannel3) + + outputChannel := multiplexer.NewOutChannel() + + ctx, cancel := context.WithCancel(context.Background()) + g, ctx := errgroup.WithContext(ctx) + + g.Go(func() error { + return multiplexer.Run(ctx) + }) + + g.Go(func() error { + select { + case multiplexChannel1 <- test.arg1: + case <-ctx.Done(): + return ctx.Err() + } + + select { + case multiplexChannel2 <- test.arg2: + case <-ctx.Done(): + return ctx.Err() + } + + select { + case multiplexChannel3 <- test.arg3: + case <-ctx.Done(): + return ctx.Err() + } + + close(multiplexChannel1) + close(multiplexChannel2) + close(multiplexChannel3) + + return nil + }) + + stop := false + got := 0 + + g.Go(func() error { + for !stop { + select { + case output, more := <-outputChannel: + if !more { + stop = true + break + } + + got += output + case <-time.After(time.Second * 1): + stop = true + break + case <-ctx.Done(): + return ctx.Err() + } + } + + if got != test.want { + t.Errorf("Got %d, wanted %d", got, test.want) + } + + cancel() + + return nil + }) + + g.Wait() + } +} + +func TestClosedChannels(t *testing.T) { + multiplexChannel := make(chan int) + multiplexer := NewChannelMux(multiplexChannel) + + outputChannel1 := multiplexer.NewOutChannel() + outputChannel2 := multiplexer.NewOutChannel() + outputChannel3 := multiplexer.NewOutChannel() + + ctx, cancel := context.WithCancel(context.Background()) + g, ctx := errgroup.WithContext(ctx) + + g.Go(func() error { + return multiplexer.Run(ctx) + }) + + cancel() + + select { + case <-outputChannel1: + case <-time.After(time.Second): + t.Error("1st channel is still open, should be closed") + } + + select { + case <-outputChannel2: + case <-time.After(time.Second): + t.Error("2nd channel is still open, should be closed") + } + + select { + case <-outputChannel3: + case <-time.After(time.Second): + t.Error("3rd channel is still open, should be closed") + } + +} From 396b3ad42e10070b0b17fbc3486f30d1e5ee15c1 Mon Sep 17 00:00:00 2001 From: Johannes Rauh Date: Thu, 30 Nov 2023 10:15:07 +0100 Subject: [PATCH 12/12] Fix review --- cmd/icinga-kubernetes/main.go | 9 +- pkg/sync/channel-mux.go | 68 +++++++------- pkg/sync/channel-mux_test.go | 20 ++-- pkg/sync/logs.go | 148 +++++++++++++++--------------- pkg/sync/options.go | 37 ++++++++ pkg/sync/sync.go | 168 +++++++++------------------------- 6 files changed, 206 insertions(+), 244 deletions(-) create mode 100644 pkg/sync/options.go diff --git a/cmd/icinga-kubernetes/main.go b/cmd/icinga-kubernetes/main.go index 9ac8e6bd..f56e5222 100644 --- a/cmd/icinga-kubernetes/main.go +++ b/cmd/icinga-kubernetes/main.go @@ -16,6 +16,7 @@ import ( kinformers "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes" kclientcmd "k8s.io/client-go/tools/clientcmd" + "time" ) func main() { @@ -96,10 +97,12 @@ func main() { ) }) - logSync := sync.NewContainerLogSync(k, db, logs.GetChildLogger("ContainerLogs")) - g.Go(func() error { - return logSync.Run(ctx, podUpserts, podDeletes) + return sync.NewContainerLogSync( + k, db, logs.GetChildLogger("ContainerLogs"), time.Second*15, + ).Run( + ctx, podUpserts, podDeletes, + ) }) if err := g.Wait(); err != nil { diff --git a/pkg/sync/channel-mux.go b/pkg/sync/channel-mux.go index 69635ba9..2e05f73f 100644 --- a/pkg/sync/channel-mux.go +++ b/pkg/sync/channel-mux.go @@ -7,79 +7,79 @@ import ( ) // ChannelMux is a multiplexer for channels of variable types. -// It fans all input channels to all output channels. +// It fans out all input channels to all output channels. type ChannelMux[T any] interface { + // In adds the given input channel reading. + In(<-chan T) - // AddInChannel adds given input channel to the list of input channels. - AddInChannel(<-chan T) + // Out returns a new output channel that receives from all input channels. + Out() <-chan T - // NewOutChannel returns and adds new output channel to the pods of created addedOutChannels. - NewOutChannel() <-chan T + // AddOut registers the given output channel to receive from all input channels. + AddOut(chan<- T) - // AddOutChannel adds given output channel to the list of added addedOutChannels. - AddOutChannel(chan<- T) - - // Run combines output channel lists and starts multiplexing. + // Run starts multiplexing of all input channels to all output channels. Run(context.Context) error } type channelMux[T any] struct { - inChannels []<-chan T - createdOutChannels []chan<- T - addedOutChannels []chan<- T - started atomic.Bool + in []<-chan T + out []chan<- T + outAdded []chan<- T + started atomic.Bool } -// NewChannelMux creates new ChannelMux initialized with at least one input channel -func NewChannelMux[T any](initInChannel <-chan T, inChannels ...<-chan T) ChannelMux[T] { +// NewChannelMux returns a new ChannelMux initialized with at least one input channel. +func NewChannelMux[T any](inChannel <-chan T, inChannels ...<-chan T) ChannelMux[T] { return &channelMux[T]{ - inChannels: append(make([]<-chan T, 0), append(inChannels, initInChannel)...), + in: append(inChannels, inChannel), } } -func (mux *channelMux[T]) AddInChannel(channel <-chan T) { +func (mux *channelMux[T]) In(channel <-chan T) { if mux.started.Load() { panic("channelMux already started") } - mux.inChannels = append(mux.inChannels, channel) + mux.in = append(mux.in, channel) } -func (mux *channelMux[T]) NewOutChannel() <-chan T { +func (mux *channelMux[T]) Out() <-chan T { if mux.started.Load() { panic("channelMux already started") } channel := make(chan T) - mux.createdOutChannels = append(mux.createdOutChannels, channel) + mux.out = append(mux.out, channel) return channel } -func (mux *channelMux[T]) AddOutChannel(channel chan<- T) { +func (mux *channelMux[T]) AddOut(channel chan<- T) { if mux.started.Load() { panic("channelMux already started") } - mux.addedOutChannels = append(mux.addedOutChannels, channel) + mux.outAdded = append(mux.outAdded, channel) } func (mux *channelMux[T]) Run(ctx context.Context) error { - mux.started.Store(true) + if mux.started.Swap(true) { + panic("channelMux already started") + } defer func() { - for _, channelToClose := range mux.createdOutChannels { + for _, channelToClose := range mux.out { close(channelToClose) } }() - outChannels := append(mux.addedOutChannels, mux.createdOutChannels...) + g, ctx := errgroup.WithContext(ctx) sink := make(chan T) + defer close(sink) - g, ctx := errgroup.WithContext(ctx) - - for _, ch := range mux.inChannels { + for _, ch := range mux.in { ch := ch g.Go(func() error { @@ -89,7 +89,12 @@ func (mux *channelMux[T]) Run(ctx context.Context) error { if !more { return nil } - sink <- spread + select { + case sink <- spread: + case <-ctx.Done(): + return ctx.Err() + } + case <-ctx.Done(): return ctx.Err() } @@ -97,6 +102,7 @@ func (mux *channelMux[T]) Run(ctx context.Context) error { }) } + outs := append(mux.outAdded, mux.out...) g.Go(func() error { for { select { @@ -105,9 +111,9 @@ func (mux *channelMux[T]) Run(ctx context.Context) error { return nil } - for _, outChannel := range outChannels { + for _, ch := range outs { select { - case outChannel <- spread: + case ch <- spread: case <-ctx.Done(): return ctx.Err() } diff --git a/pkg/sync/channel-mux_test.go b/pkg/sync/channel-mux_test.go index 37e90b6e..467c527d 100644 --- a/pkg/sync/channel-mux_test.go +++ b/pkg/sync/channel-mux_test.go @@ -27,9 +27,9 @@ func TestAddedOutputChannels(t *testing.T) { outputChannel1 := make(chan int) outputChannel2 := make(chan int) outputChannel3 := make(chan int) - multiplexer.AddOutChannel(outputChannel1) - multiplexer.AddOutChannel(outputChannel2) - multiplexer.AddOutChannel(outputChannel3) + multiplexer.AddOut(outputChannel1) + multiplexer.AddOut(outputChannel2) + multiplexer.AddOut(outputChannel3) g, ctx := errgroup.WithContext(context.Background()) @@ -56,9 +56,9 @@ func TestCreatedOutputChannels(t *testing.T) { multiplexChannel := make(chan int) multiplexer := NewChannelMux(multiplexChannel) - outputChannel1 := multiplexer.NewOutChannel() - outputChannel2 := multiplexer.NewOutChannel() - outputChannel3 := multiplexer.NewOutChannel() + outputChannel1 := multiplexer.Out() + outputChannel2 := multiplexer.Out() + outputChannel3 := multiplexer.Out() g, ctx := errgroup.WithContext(context.Background()) @@ -100,7 +100,7 @@ func TestAddedInputChannels(t *testing.T) { multiplexer := NewChannelMux(multiplexChannel1, multiplexChannel2, multiplexChannel3) - outputChannel := multiplexer.NewOutChannel() + outputChannel := multiplexer.Out() ctx, cancel := context.WithCancel(context.Background()) g, ctx := errgroup.WithContext(ctx) @@ -173,9 +173,9 @@ func TestClosedChannels(t *testing.T) { multiplexChannel := make(chan int) multiplexer := NewChannelMux(multiplexChannel) - outputChannel1 := multiplexer.NewOutChannel() - outputChannel2 := multiplexer.NewOutChannel() - outputChannel3 := multiplexer.NewOutChannel() + outputChannel1 := multiplexer.Out() + outputChannel2 := multiplexer.Out() + outputChannel3 := multiplexer.Out() ctx, cancel := context.WithCancel(context.Background()) g, ctx := errgroup.WithContext(ctx) diff --git a/pkg/sync/logs.go b/pkg/sync/logs.go index ef0935d5..07d9d96f 100644 --- a/pkg/sync/logs.go +++ b/pkg/sync/logs.go @@ -3,7 +3,6 @@ package sync import ( "bufio" "context" - "crypto/sha1" "fmt" "github.com/icinga/icinga-go-library/database" "github.com/icinga/icinga-go-library/logging" @@ -16,42 +15,46 @@ import ( kcorev1 "k8s.io/api/core/v1" kmetav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/kubernetes" + "strconv" "strings" - msync "sync" + gosync "sync" "time" ) -type podListItem struct { - pod *kcorev1.Pod - lastTimestamps map[[20]byte]*kmetav1.Time -} - -// ContainerLogSync reacts to pod changes and syncs container logs to database. -// On pod add/updates ContainerLogSync starts syncing. On pod deletes syncing stops. -// Container logs are periodic fetched from Kubernetes API. +// ContainerLogSync reacts to pod changes and synchronizes container logs +// with the database. When a pod is added/updated, ContainerLogSync starts +// synchronizing its containers. When a pod is deleted, synchronization stops. +// Container logs are periodically fetched from the Kubernetes API. type ContainerLogSync interface { // Run starts the ContainerLogSync. Run(context.Context, <-chan contracts.KUpsert, <-chan contracts.KDelete) error } +// NewContainerLogSync creates new ContainerLogSync initialized with clientset, database and logger. +func NewContainerLogSync(clientset *kubernetes.Clientset, db *database.DB, logger *logging.Logger, period time.Duration) ContainerLogSync { + return &containerLogSync{ + pods: make(map[string]podListItem), + mutex: &gosync.RWMutex{}, + clientset: clientset, + db: db, + logger: logger, + period: period, + } +} + // containerLogSync syncs container logs to database. type containerLogSync struct { - pods map[[20]byte]podListItem - mutex *msync.RWMutex + pods map[string]podListItem + mutex *gosync.RWMutex clientset *kubernetes.Clientset db *database.DB logger *logging.Logger + period time.Duration } -// NewContainerLogSync creates new containerLogSync initialized with clientset, database and logger. -func NewContainerLogSync(clientset *kubernetes.Clientset, db *database.DB, logger *logging.Logger) ContainerLogSync { - return &containerLogSync{ - pods: make(map[[20]byte]podListItem), - mutex: &msync.RWMutex{}, - clientset: clientset, - db: db, - logger: logger, - } +type podListItem struct { + pod *kcorev1.Pod + lastTimestamps map[string]*kmetav1.Time } // upsertStmt returns a database statement to upsert a container log. @@ -65,129 +68,135 @@ func (ls *containerLogSync) upsertStmt() string { ) } -// splitTimestampsFromMessages takes a log and returns timestamps and messages as separate parts. -// Additionally, it updates the last checked timestamp for the container log. -func (ls *containerLogSync) splitTimestampsFromMessages(log types.Binary, curPodId [20]byte, curContainerId [20]byte) (times []string, messages []string, err error) { +// splitTimestampsFromMessages takes a log line and returns timestamps and messages as separate parts. +func (ls *containerLogSync) splitTimestampsFromMessages(log types.Binary, curPodId string, curContainerId string) (times []string, messages []string, newLastTimestamp time.Time, returnErr error) { stringReader := strings.NewReader(string(log)) reader := bufio.NewReader(stringReader) + var parsedTimestamp time.Time + for { line, err := reader.ReadString('\n') if err != nil { if err == io.EOF { break } - return nil, nil, errors.Wrap(err, "error reading log message") + + returnErr = errors.Wrap(err, "error reading log message") + return } - messageTime, err := time.Parse("2006-01-02T15:04:05.999999999Z", strings.Split(line, " ")[0]) + timestamp, message, _ := strings.Cut(line, " ") + + parsedTimestamp, err = time.Parse("2006-01-02T15:04:05.999999999Z", timestamp) if err != nil { - logging.Fatal(errors.Wrap(err, "error parsing log timestamp")) + ls.logger.Fatal(errors.Wrap(err, "error parsing log timestamp")) continue } - if ls.pods[curPodId].lastTimestamps[curContainerId] != nil && messageTime.UnixNano() <= ls.pods[curPodId].lastTimestamps[curContainerId].UnixNano() { + if lastTimestamp, ok := ls.pods[curPodId].lastTimestamps[curContainerId]; ok && + (parsedTimestamp.Before(lastTimestamp.Time) || parsedTimestamp.Equal(lastTimestamp.Time)) { continue } - times = append(times, strings.Split(line, " ")[0]) - messages = append(messages, strings.Join(strings.Split(line, " ")[1:], " ")) + times = append(times, strconv.FormatInt(parsedTimestamp.UnixMilli(), 10)) + messages = append(messages, message) } - return times, messages, nil + newLastTimestamp = parsedTimestamp + + return } // maintainList updates pods depending on the objects coming in via upsert and delete channel. -func (ls *containerLogSync) maintainList(ctx context.Context, upsertChannel <-chan contracts.KUpsert, deleteChannel <-chan contracts.KDelete) error { +func (ls *containerLogSync) maintainList(ctx context.Context, kupserts <-chan contracts.KUpsert, kdeletes <-chan contracts.KDelete) error { g, ctx := errgroup.WithContext(ctx) - deletes := make(chan any) + databaseDeletes := make(chan any) g.Go(func() error { - defer close(deletes) + defer close(databaseDeletes) for { select { - case <-ctx.Done(): - return errors.Wrap(ctx.Err(), "context canceled maintain log sync pods") - - case podFromChannel, more := <-upsertChannel: + case kupsert, more := <-kupserts: if !more { return nil } - pod := podFromChannel.KObject().(*kcorev1.Pod) - podId := sha1.Sum(types.Checksum(podFromChannel.ID().String())) + podId := kupsert.ID().String() - _, ok := ls.pods[podId] - - if ok { + if _, ok := ls.pods[podId]; ok { continue } ls.mutex.RLock() - ls.pods[podId] = podListItem{pod: pod} + ls.pods[podId] = podListItem{ + pod: kupsert.KObject().(*kcorev1.Pod), + lastTimestamps: make(map[string]*kmetav1.Time), + } ls.mutex.RUnlock() - case podIdFromChannel, more := <-deleteChannel: + case kdelete, more := <-kdeletes: if !more { return nil } - podId := sha1.Sum(types.Checksum(podIdFromChannel.ID().String())) + podId := kdelete.ID().String() ls.mutex.RLock() delete(ls.pods, podId) ls.mutex.RUnlock() select { - case deletes <- podId: + case databaseDeletes <- podId: case <-ctx.Done(): return ctx.Err() } + case <-ctx.Done(): + return ctx.Err() } } }) g.Go(func() error { - return database.NewDelete(ls.db).ByColumn("container_id").Stream(ctx, &schema.ContainerLog{}, deletes) + return database.NewDelete(ls.db, database.ByColumn("container_id")).Stream(ctx, &schema.ContainerLog{}, databaseDeletes) }) return g.Wait() } -func (ls *containerLogSync) Run(ctx context.Context, upsertChannel <-chan contracts.KUpsert, deleteChannel <-chan contracts.KDelete) error { +func (ls *containerLogSync) Run(ctx context.Context, kupserts <-chan contracts.KUpsert, kdeletes <-chan contracts.KDelete) error { ls.logger.Info("Starting sync") g, ctx := errgroup.WithContext(ctx) g.Go(func() error { - return ls.maintainList(ctx, upsertChannel, deleteChannel) + return ls.maintainList(ctx, kupserts, kdeletes) }) - upsertStmt := ls.upsertStmt() - upserts := make(chan database.Entity) - defer close(upserts) + databaseUpserts := make(chan database.Entity) + defer close(databaseUpserts) g.Go(func() error { for { for _, element := range ls.pods { - podId := sha1.Sum(types.Checksum(element.pod.Namespace + "/" + element.pod.Name)) + podId := types.Binary(types.Checksum(element.pod.Namespace + "/" + element.pod.Name)) for _, container := range element.pod.Spec.Containers { - containerId := sha1.Sum(types.Checksum(element.pod.Namespace + "/" + element.pod.Name + "/" + container.Name)) + containerId := types.Binary(types.Checksum(element.pod.Namespace + "/" + element.pod.Name + "/" + container.Name)) podLogOpts := kcorev1.PodLogOptions{Container: container.Name, Timestamps: true} - if ls.pods[podId].lastTimestamps != nil { - podLogOpts.SinceTime = ls.pods[podId].lastTimestamps[containerId] + if _, ok := ls.pods[podId.String()].lastTimestamps[containerId.String()]; ok { + podLogOpts.SinceTime = ls.pods[podId.String()].lastTimestamps[containerId.String()] } log, err := ls.clientset.CoreV1().Pods(element.pod.Namespace).GetLogs(element.pod.Name, &podLogOpts).Do(ctx).Raw() if err != nil { - fmt.Println(errors.Wrap(err, "error reading container log")) + ls.logger.Fatal(errors.Wrap(err, "error reading container log")) continue } - times, messages, err := ls.splitTimestampsFromMessages(log, podId, containerId) + times, messages, lastTimestamp, err := ls.splitTimestampsFromMessages(log, podId.String(), containerId.String()) if err != nil { return err } @@ -197,44 +206,37 @@ func (ls *containerLogSync) Run(ctx context.Context, upsertChannel <-chan contra } newLog := &schema.ContainerLog{ - ContainerId: containerId[:], - PodId: podId[:], + ContainerId: containerId, + PodId: podId, Time: strings.Join(times, "\n"), Log: strings.Join(messages, "\n"), } select { - case upserts <- newLog: + case databaseUpserts <- newLog: case <-ctx.Done(): return ctx.Err() } - lastTime, err := time.Parse("2006-01-02T15:04:05.999999999Z", times[len(times)-1]) - if err != nil { - return errors.Wrap(err, "error parsing log time") - } - - lastV1Time := kmetav1.Time{Time: lastTime} - - if _, ok := ls.pods[podId]; !ok { + if _, ok := ls.pods[podId.String()]; !ok { continue } - ls.pods[podId].lastTimestamps[containerId] = &lastV1Time + ls.pods[podId.String()].lastTimestamps[containerId.String()] = &kmetav1.Time{Time: lastTimestamp} } } select { case <-ctx.Done(): return ctx.Err() - case <-time.After(time.Second * 15): + case <-time.After(ls.period): } } }) g.Go(func() error { - return database.NewUpsert(ls.db).WithStatement(upsertStmt, 5).Stream(ctx, upserts) + return database.NewUpsert(ls.db, database.WithStatement(ls.upsertStmt(), 5)).Stream(ctx, databaseUpserts) }) return g.Wait() diff --git a/pkg/sync/options.go b/pkg/sync/options.go new file mode 100644 index 00000000..1a874761 --- /dev/null +++ b/pkg/sync/options.go @@ -0,0 +1,37 @@ +package sync + +import "github.com/icinga/icinga-kubernetes/pkg/contracts" + +// syncOption is a functional option for NewSync. +type syncOption func(options *syncOptions) + +// syncOptions stores options for sync. +type syncOptions struct { + forwardUpserts chan<- contracts.KUpsert + forwardDeletes chan<- contracts.KDelete +} + +// newSyncOptions returns a new syncOptions initialized with the given options. +func newSyncOptions(options ...syncOption) *syncOptions { + syncOpts := &syncOptions{} + + for _, option := range options { + option(syncOpts) + } + + return syncOpts +} + +// WithForwardUpserts forwards added and updated Kubernetes resources to the specific channel. +func WithForwardUpserts(channel chan<- contracts.KUpsert) syncOption { + return func(options *syncOptions) { + options.forwardUpserts = channel + } +} + +// WithForwardDeletes forwards deleted Kubernetes resources to the specific channel. +func WithForwardDeletes(channel chan<- contracts.KDelete) syncOption { + return func(options *syncOptions) { + options.forwardDeletes = channel + } +} diff --git a/pkg/sync/sync.go b/pkg/sync/sync.go index e04b5125..0b46ba84 100644 --- a/pkg/sync/sync.go +++ b/pkg/sync/sync.go @@ -15,7 +15,7 @@ import ( ) type Sync interface { - Run(context.Context, ...SyncOption) error + Run(context.Context, ...syncOption) error } type sync struct { @@ -31,46 +31,15 @@ func NewSync( informer kcache.SharedInformer, logger *logging.Logger, ) Sync { - s := &sync{ + return &sync{ db: db, informer: informer, logger: logger, factory: factory, } - - return s -} - -func WithForwardUpserts(channel chan<- contracts.KUpsert) SyncOption { - return func(options *SyncOptions) { - options.forwardUpserts = channel - } -} - -func WithForwardDeletes(channel chan<- contracts.KDelete) SyncOption { - return func(options *SyncOptions) { - options.forwardDeletes = channel - } } -type SyncOption func(options *SyncOptions) - -type SyncOptions struct { - forwardUpserts chan<- contracts.KUpsert - forwardDeletes chan<- contracts.KDelete -} - -func NewSyncOptions(options ...SyncOption) *SyncOptions { - syncOptions := &SyncOptions{} - - for _, option := range options { - option(syncOptions) - } - - return syncOptions -} - -func (s *sync) Run(ctx context.Context, execOptions ...SyncOption) error { +func (s *sync) Run(ctx context.Context, options ...syncOption) error { s.logger.Info("Starting sync") s.logger.Debug("Warming up") @@ -96,132 +65,77 @@ func (s *sync) Run(ctx context.Context, execOptions ...SyncOption) error { s.logger.Debug("Finished warming up") - s.factory().GetResourceVersion() - - syncOptions := NewSyncOptions(execOptions...) - - // init upsert channel spreader - multiplexUpsertChannel := make(chan contracts.KUpsert) - defer close(multiplexUpsertChannel) - - multiplexUpsert := NewChannelMux(multiplexUpsertChannel) + syncOpts := newSyncOptions(options...) - upsertChannel := multiplexUpsert.NewOutChannel() + kupsertsMux := NewChannelMux(changes.Adds(), changes.Updates()) + kupserts := kupsertsMux.Out() - if syncOptions.forwardUpserts != nil { - multiplexUpsert.AddOutChannel(syncOptions.forwardUpserts) + if syncOpts.forwardUpserts != nil { + kupsertsMux.AddOut(syncOpts.forwardUpserts) } - // run upsert channel spreader g.Go(func() error { - return multiplexUpsert.Run(ctx) + return kupsertsMux.Run(ctx) }) - upsertToStream := make(chan database.Entity) - defer close(upsertToStream) + databaseUpserts := make(chan database.Entity) + defer close(databaseUpserts) - for _, ch := range []<-chan contracts.KUpsert{changes.Adds(), changes.Updates()} { - ch := ch - - g.Go(func() error { - for { - select { - case kupsert, more := <-ch: - if !more { - return nil - } - - select { - case multiplexUpsertChannel <- kupsert: - case <-ctx.Done(): - return ctx.Err() - } - case <-ctx.Done(): - return ctx.Err() + g.Go(func() error { + for { + select { + case kupsert, more := <-kupserts: + if !more { + return nil } - } - }) - g.Go(func() error { - for { + entity := s.factory() + entity.SetID(kupsert.ID()) + entity.SetCanonicalName(kupsert.GetCanonicalName()) + entity.Obtain(kupsert.KObject()) + select { - case kupsert, more := <-upsertChannel: - if !more { - return nil - } - - entity := s.factory() - entity.SetID(kupsert.ID()) - entity.SetCanonicalName(kupsert.GetCanonicalName()) - entity.Obtain(kupsert.KObject()) - - select { - case upsertToStream <- entity: - s.logger.Debugw( - fmt.Sprintf("Sync: Upserted %s", kupsert.GetCanonicalName()), - zap.String("id", kupsert.ID().String())) - case <-ctx.Done(): - return ctx.Err() - } + case databaseUpserts <- entity: + s.logger.Debugw( + fmt.Sprintf("Sync: Upserted %s", kupsert.GetCanonicalName()), + zap.String("id", kupsert.ID().String())) case <-ctx.Done(): return ctx.Err() } + case <-ctx.Done(): + return ctx.Err() } - }) - } + } + }) g.Go(func() error { - return database.NewUpsert(s.db).Stream(ctx, upsertToStream) + return s.db.UpsertStreamed(ctx, databaseUpserts) }) - // init delete channel spreader - multiplexDeleteChannel := make(chan contracts.KDelete) - defer close(multiplexDeleteChannel) - - multiplexDelete := NewChannelMux(multiplexDeleteChannel) - - deleteChannel := multiplexDelete.NewOutChannel() + kdeletesMux := NewChannelMux(changes.Deletes()) + kdeletes := kdeletesMux.Out() - if syncOptions.forwardDeletes != nil { - multiplexDelete.AddOutChannel(syncOptions.forwardDeletes) + if syncOpts.forwardDeletes != nil { + kdeletesMux.AddOut(syncOpts.forwardDeletes) } - // run delete channel spreader - g.Go(func() error { - return multiplexDelete.Run(ctx) - }) - g.Go(func() error { - for { - select { - case kdelete, more := <-changes.Deletes(): - if !more { - return nil - } - select { - case multiplexDeleteChannel <- kdelete: - case <-ctx.Done(): - return ctx.Err() - } - case <-ctx.Done(): - return ctx.Err() - } - } + return kdeletesMux.Run(ctx) }) - deleteToStream := make(chan any) + databaseDeletes := make(chan any) g.Go(func() error { - defer close(deleteToStream) + defer close(databaseDeletes) for { select { - case kdelete, more := <-deleteChannel: + case kdelete, more := <-kdeletes: if !more { return nil } select { - case deleteToStream <- kdelete.ID(): + case databaseDeletes <- kdelete.ID(): s.logger.Debugw( fmt.Sprintf("Sync: Deleted %s", kdelete.GetCanonicalName()), zap.String("id", kdelete.ID().String())) @@ -235,7 +149,7 @@ func (s *sync) Run(ctx context.Context, execOptions ...SyncOption) error { }) g.Go(func() error { - return database.NewDelete(s.db).Stream(ctx, s.factory(), deleteToStream) + return s.db.DeleteStreamed(ctx, s.factory(), databaseDeletes) }) g.Go(func() error {