Skip to content

Commit

Permalink
feat: managed services
Browse files Browse the repository at this point in the history
  • Loading branch information
hjwalt committed Apr 10, 2024
1 parent 5067a86 commit 30066f2
Show file tree
Hide file tree
Showing 11 changed files with 1,114 additions and 0 deletions.
86 changes: 86 additions & 0 deletions managed/api.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
package managed

import (
"context"

"github.com/hjwalt/runway/inverse"
)

const (
QualifierLifecycle = "lifecycle"
QualifierRetry = "retry"
QualifierHttpHandler = "http-handler"
QualifierHealth = "health"
)

// lifecycle: register -> resolve -> clean
type Component interface {
Name() string
Register(context.Context, inverse.Container) error
Resolve(context.Context, inverse.Container) error
Clean() error
}

// lifecycle: register -> resolve -> start -> stop -> clean
type Service interface {
Component

Start() error
Stop() error
}

// special archetypes

type Configuration interface {
Component

Has(key string) bool
Get() map[string]string
GetString(key string, defaultValue string) string
GetBool(key string, defaultValue bool) bool
GetInt32(key string, defaultValue int32) int32
GetInt64(key string, defaultValue int64) int64
GetUint32(key string, defaultValue uint32) uint32
GetUint64(key string, defaultValue uint64) uint64
}

type Health interface {
Component

GetString() map[string]string
SetString(component string, key string, value string)
GetBool() map[string]bool
SetBool(component string, key string, value bool)
GetInt() map[string]int64
SetInt(component string, key string, value int64)
IncInt(component string, key string, value int64)
DecInt(component string, key string, value int64)
}

type Retry interface {
Component

Do(fnToDo func(int64) error) error
}

type Lifecycle interface {
Service

Running() bool
Error(error)
}

// Running functions

type Runnable interface {
Run() error
}

type Channel[T any] interface {
Channel() (<-chan T, error)
Loop(T) error
}

type Loop interface {
Loop() error
}
39 changes: 39 additions & 0 deletions managed/channel.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package managed

import "errors"

func NewChannel[T any](channel Channel[T]) Service {
c := &channelRunnable[T]{
channel: channel,
}
return NewRunner(c)
}

type channelRunnable[T any] struct {
channel Channel[T]
}

func (r *channelRunnable[T]) Run() error {
if r.channel == nil {
return ErrChannelRuntimeNoChannel
}

channel, channelErr := r.channel.Channel()
if channelErr != nil {
return channelErr
}

for v := range channel {
err := r.channel.Loop(v)
if err != nil {
return err
}
}

return nil
}

// Errors
var (
ErrChannelRuntimeNoChannel = errors.New("functional runtime no Channel function provided")
)
222 changes: 222 additions & 0 deletions managed/channel_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,222 @@
package managed_test

import (
"context"
"errors"
"testing"

"github.com/hjwalt/runway/inverse"
"github.com/hjwalt/runway/managed"
"github.com/stretchr/testify/assert"
)

func TestChannelWillStopNormally(t *testing.T) {
assert := assert.New(t)

value := 0

loop := &TestChannel{
loop: func(data *TestData) error {
data.value += 1
value += 1
return nil
},
}

fnRuntime := managed.NewChannel[*TestData](loop)

services := []managed.Service{}
services = append(services, fnRuntime)
services = append(services, loop)
manager := managed.New(services, []managed.Component{}, []managed.Configuration{})

startErr := manager.Start()
assert.NoError(startErr)

loop.channel <- &TestData{}
loop.channel <- &TestData{}
loop.channel <- &TestData{}
loop.channel <- &TestData{}
loop.channel <- &TestData{}
loop.channel <- &TestData{}
loop.channel <- &TestData{}
loop.channel <- &TestData{}
loop.channel <- &TestData{}
loop.channel <- &TestData{}

manager.Stop()

assert.Equal(10, value)
}

func TestChannelBrokenInit(t *testing.T) {
assert := assert.New(t)

value := 0

loop := &TestChannel{
brokenInit: true,
loop: func(data *TestData) error {
data.value += 1
value += 1
return nil
},
}

fnRuntime := managed.NewChannel[*TestData](loop)

services := []managed.Service{}
services = append(services, fnRuntime)
services = append(services, loop)
manager := managed.New(services, []managed.Component{}, []managed.Configuration{})

startErr := manager.Start()
assert.Error(startErr)

manager.Stop()
assert.Equal(0, value)
}

func TestChannelBrokenChannel(t *testing.T) {
assert := assert.New(t)

value := 0

loop := &TestChannel{
brokenChannel: true,
loop: func(data *TestData) error {
data.value += 1
value += 1
return nil
},
}

fnRuntime := managed.NewChannel[*TestData](loop)

services := []managed.Service{}
services = append(services, fnRuntime)
services = append(services, loop)
manager := managed.New(services, []managed.Component{}, []managed.Configuration{})

startErr := manager.Start()
assert.NoError(startErr)
manager.Stop()
assert.Equal(0, value)
}

func TestChannelNilChannel(t *testing.T) {
assert := assert.New(t)

value := 0

fnRuntime := managed.NewChannel[*TestData](nil)

services := []managed.Service{}
services = append(services, fnRuntime)
manager := managed.New(services, []managed.Component{}, []managed.Configuration{})

startErr := manager.Start()
assert.NoError(startErr)
manager.Stop()
assert.Equal(0, value)
}

func TestChannelBrokenLoop(t *testing.T) {
assert := assert.New(t)

value := 0

loop := &TestChannel{
loop: func(data *TestData) error {
data.value += 1
value += 1
if value == 3 {
return errors.New("broken value")
}
return nil
},
}

fnRuntime := managed.NewChannel[*TestData](loop)

services := []managed.Service{}
services = append(services, fnRuntime)
services = append(services, loop)
manager := managed.New(services, []managed.Component{}, []managed.Configuration{})

startErr := manager.Start()

loop.channel <- &TestData{}
loop.channel <- &TestData{}
loop.channel <- &TestData{}

manager.Stop()

assert.NoError(startErr)
assert.Equal(3, value)
}

type TestChannel struct {
brokenInit bool
brokenChannel bool
channel chan *TestData
loop func(*TestData) error
}

func (r *TestChannel) Name() string {
return "test-channel"
}

func (r *TestChannel) Register(ctx context.Context, ic inverse.Container) error {
return nil
}

func (r *TestChannel) Resolve(ctx context.Context, ic inverse.Container) error {
return nil
}

func (r *TestChannel) Clean() error {
return nil
}

func (l *TestChannel) Start() error {
l.channel = make(chan *TestData)
if l.brokenInit {
return errors.New("broken init")
}
return nil
}

func (l *TestChannel) Stop() error {
alreadyClosed := false
defer func() {
if recover() != nil {
// The return result can be altered
// in a defer function call.
alreadyClosed = true
}
}()

close(l.channel)

if alreadyClosed {
return errors.New("channel already closed")
} else {
return nil
}
}

func (l *TestChannel) Channel() (<-chan *TestData, error) {
if l.brokenChannel {
return nil, errors.New("broken channel")
}
return l.channel, nil
}

func (l *TestChannel) Loop(d *TestData) error {
return l.loop(d)
}

type TestData struct {
value int
}
Loading

0 comments on commit 30066f2

Please sign in to comment.