Skip to content

Commit

Permalink
Fix review
Browse files Browse the repository at this point in the history
  • Loading branch information
jrauh01 committed Nov 29, 2023
1 parent c67bf64 commit bbed616
Show file tree
Hide file tree
Showing 7 changed files with 192 additions and 176 deletions.
7 changes: 1 addition & 6 deletions cmd/icinga-kubernetes/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,11 +71,6 @@ func main() {

g, ctx := errgroup.WithContext(ctx)

forwardUpsertPodsChannel := make(chan database.Entity)
forwardDeletePodsChannel := make(chan any)
defer close(forwardUpsertPodsChannel)
defer close(forwardDeletePodsChannel)

g.Go(func() error {
return sync.NewSync(
db, schema.NewNode, informers.Core().V1().Nodes().Informer(), logs.GetChildLogger("Nodes"),
Expand All @@ -101,7 +96,7 @@ func main() {
)
})

logSync := sync.NewLogSync(k, db, logs.GetChildLogger("ContainerLogs"))
logSync := sync.NewContainerLogSync(k, db, logs.GetChildLogger("ContainerLogs"))

g.Go(func() error {
return logSync.Run(ctx, podUpserts, podDeletes)
Expand Down
13 changes: 7 additions & 6 deletions pkg/schema/log.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
package schema

type Log struct {
import "github.com/icinga/icinga-go-library/types"

type ContainerLog struct {
kmetaWithoutNamespace
Id []byte
ReferenceId []byte
ContainerName string
Time string
Log string
ContainerId types.Binary
PodId types.Binary
Time string
Log string
}
97 changes: 97 additions & 0 deletions pkg/sync/channel-mux.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
package sync

import (
"context"
"sync/atomic"
)

// ChannelMux is a multiplexer for channels of variable types.
// It fans all input channels to all output channels.
type ChannelMux[T any] interface {

// AddInChannel adds given input channel to the list of input channels.
AddInChannel(<-chan T)

// NewOutChannel returns and adds new output channel to the pods of created addedOutChannels.
NewOutChannel() <-chan T

// AddOutChannel adds given output channel to the list of added addedOutChannels.
AddOutChannel(chan<- T)

// Run combines output channel lists and starts multiplexing.
Run(context.Context) error
}

type channelMux[T any] struct {
inChannels []<-chan T
createdOutChannels []chan<- T
addedOutChannels []chan<- T
started atomic.Bool
}

// NewChannelMux creates new ChannelMux initialized with at least one input channel
func NewChannelMux[T any](initInChannel <-chan T, inChannels ...<-chan T) ChannelMux[T] {
return &channelMux[T]{
inChannels: append(make([]<-chan T, 0), append(inChannels, initInChannel)...),
}
}

func (mux *channelMux[T]) AddInChannel(channel <-chan T) {
if mux.started.Load() {
panic("channelMux already started")
}

mux.inChannels = append(mux.inChannels, channel)
}

func (mux *channelMux[T]) NewOutChannel() <-chan T {
if mux.started.Load() {
panic("channelMux already started")
}

channel := make(chan T)
mux.createdOutChannels = append(mux.createdOutChannels, channel)

return channel
}

func (mux *channelMux[T]) AddOutChannel(channel chan<- T) {
if mux.started.Load() {
panic("channelMux already started")
}

mux.addedOutChannels = append(mux.addedOutChannels, channel)
}

func (mux *channelMux[T]) Run(ctx context.Context) error {
mux.started.Store(true)

defer func() {
for _, channelToClose := range mux.createdOutChannels {
close(channelToClose)
}
}()

outChannels := append(mux.addedOutChannels, mux.createdOutChannels...)

for {
for _, inChannel := range mux.inChannels {
select {
case spread, more := <-inChannel:
if !more {
return nil
}

for _, outChannel := range outChannels {
select {
case outChannel <- spread:
case <-ctx.Done():
return ctx.Err()
}
}
case <-ctx.Done():
return ctx.Err()
}
}
}
}
Loading

0 comments on commit bbed616

Please sign in to comment.