Skip to content

Commit

Permalink
refactor: groundwork for improving overall project structure.
Browse files Browse the repository at this point in the history
Signed-off-by: Andrew Neudegg <andrew.neudegg@finbourne.com>
  • Loading branch information
AndrewNeudegg committed Jan 5, 2021
1 parent 9dfe794 commit ece9561
Show file tree
Hide file tree
Showing 8 changed files with 290 additions and 0 deletions.
1 change: 1 addition & 0 deletions pkg/pipelines/builder.go
@@ -0,0 +1 @@
package pipelines
14 changes: 14 additions & 0 deletions pkg/pipelines/definitions/pipeline.go
@@ -0,0 +1,14 @@
package definitions

import (
"context"

resourceDefinitions "github.com/andrewneudegg/delta/pkg/resources/definitions"
)

// Pipeline defines the construction of I->P->O.
type Pipeline interface {
resourceDefinitions.Resource

Do(context.Context)
}
86 changes: 86 additions & 0 deletions pkg/resources/builder.go
@@ -0,0 +1,86 @@
package resources

import (
"fmt"

"github.com/andrewneudegg/delta/pkg/resources/definitions"
"github.com/andrewneudegg/delta/pkg/resources/examples/noop"
"github.com/mitchellh/mapstructure"
"github.com/pkg/errors"
)

// BuildInputResource will construct a resource from the given inputs.
func BuildInputResource(identifier string, data interface{}) (definitions.Input, error) {
// Duplicate names are prohibited.
m := map[string]definitions.Input{
noop.Input{}.ID(): noop.Input{},
}

val, ok := m[identifier]
if !ok {
return nil, fmt.Errorf("source '%s' is unknown", identifier)
}

if err := mapstructure.Decode(data, &val); err != nil {
return nil, errors.Wrapf(err, "could not decode mapstructure for '%s'", identifier)
}

return val, nil
}

// BuildProcessResource will construct a resource from the given process.
func BuildProcessResource(identifier string, data interface{}) (definitions.Process, error) {
// Duplicate names are prohibited.
m := map[string]definitions.Process{
noop.Process{}.ID(): noop.Process{},
}

val, ok := m[identifier]
if !ok {
return nil, fmt.Errorf("source '%s' is unknown", identifier)
}

if err := mapstructure.Decode(data, &val); err != nil {
return nil, errors.Wrapf(err, "could not decode mapstructure for '%s'", identifier)
}

return val, nil
}

// BuildOutputResource will construct a resource from the given outputs.
func BuildOutputResource(identifier string, data interface{}) (definitions.Output, error) {
// Duplicate names are prohibited.
m := map[string]definitions.Output{
noop.Output{}.ID(): noop.Output{},
}

val, ok := m[identifier]
if !ok {
return nil, fmt.Errorf("source '%s' is unknown", identifier)
}

if err := mapstructure.Decode(data, &val); err != nil {
return nil, errors.Wrapf(err, "could not decode mapstructure for '%s'", identifier)
}

return val, nil
}

// BuildMetaResource will construct a resource from the given inputs.
func BuildMetaResource(identifier string, data interface{}) (definitions.Resource, error) {
// Duplicate names are prohibited.
m := map[string]definitions.Meta{
noop.Meta{}.ID(): noop.Meta{},
}

val, ok := m[identifier]
if !ok {
return nil, fmt.Errorf("source '%s' is unknown", identifier)
}

if err := mapstructure.Decode(data, &val); err != nil {
return nil, errors.Wrapf(err, "could not decode mapstructure for '%s'", identifier)
}

return val, nil
}
65 changes: 65 additions & 0 deletions pkg/resources/definitions/types.go
@@ -0,0 +1,65 @@
package definitions

import (
"context"

"github.com/andrewneudegg/delta/pkg/events"
)

// ResourceType explains what a resource is at runtime.
type ResourceType string

const (
// InputType of resource.
InputType ResourceType = "input"

// ProcessType of resource.
ProcessType ResourceType = "process"

// OutputType of resource.
OutputType ResourceType = "output"

// MetaType of resource.
MetaType ResourceType = "meta"
)

// Resource is a thing that does things.
type Resource interface {
ID() string // ID defines what this thing is.
Type() ResourceType // Type defines what type of resource this is.
}

// Input resource accepts events through some boundary.
type Input interface {
Resource

// DoInput will accept collections of events, passing them into the channel.
DoInput(context.Context, chan<- events.Collection) error
}

// Process resource transfers or manipulates events.
type Process interface {
Resource

// DoProcess will perform its function, on each collection placed into the channel
// eventually passing a similar collection to the output.
DoProcess(context.Context, <-chan events.Collection, chan<- events.Collection) error
}

// Output resource passes events across some boundary.
type Output interface {
Resource

// DoOutput will perform its function on each collection placed into the channel.
DoOutput(context.Context, <-chan events.Collection) error
}

// Meta resource augments one of the other resource types, or itself.
type Meta interface {
Resource

I([]Input) (Input, error) // I, accepts inputs returning an input representing the given collection.
P([]Process) (Process, error) // P, accepts process returning a process representing the given collection.
O([]Output) (Output, error) // O, accepts outputs returning an output representing the given collection.
M([]Meta) (Meta, error) // M, accepts meta resources returning a meta input for the given collection.
}
28 changes: 28 additions & 0 deletions pkg/resources/examples/noop/input.go
@@ -0,0 +1,28 @@
package noop

import (
"context"

"github.com/andrewneudegg/delta/pkg/events"
"github.com/andrewneudegg/delta/pkg/resources/definitions"
)

// Input is simple noop.
type Input struct {
}

// ID defines what this thing is.
func (i Input) ID() string {
return "resources/examples/simple"
}

// Type defines what type of resource this is.
func (i Input) Type() definitions.ResourceType {
return definitions.InputType
}

// DoInput will accept collections of events, passing them into the channel.
func (i Input) DoInput(ctx context.Context, ch chan<- events.Collection) error {
<-ctx.Done()
return nil
}
39 changes: 39 additions & 0 deletions pkg/resources/examples/noop/meta.go
@@ -0,0 +1,39 @@
package noop

import (
"github.com/andrewneudegg/delta/pkg/resources/definitions"
)

// Meta is simple noop.
type Meta struct {
}

// ID defines what this thing is.
func (m Meta) ID() string {
return "resources/examples/simple"
}

// Type defines what type of resource this is.
func (m Meta) Type() definitions.ResourceType {
return definitions.MetaType
}

// I accepts inputs returning an input representing the given collection.
func (m Meta) I(i []definitions.Input) (definitions.Input, error) {
return Input{}, nil
}

// P accepts process returning an process representing the given collection.
func (m Meta) P(p []definitions.Process) (definitions.Process, error) {
return Process{}, nil
}

// O accepts outputs returning an output representing the given collection.
func (m Meta) O(o []definitions.Output) (definitions.Output, error) {
return Output{}, nil
}

// M accepts meta resources returning a meta input for the given collection.
func (m Meta) M(me []definitions.Meta) (definitions.Meta, error) {
return Meta{}, nil
}
28 changes: 28 additions & 0 deletions pkg/resources/examples/noop/output.go
@@ -0,0 +1,28 @@
package noop

import (
"context"

"github.com/andrewneudegg/delta/pkg/events"
"github.com/andrewneudegg/delta/pkg/resources/definitions"
)

// Output is simple noop.
type Output struct {
}

// ID defines what this thing is.
func (o Output) ID() string {
return "resources/examples/simple"
}

// Type defines what type of resource this is.
func (o Output) Type() definitions.ResourceType {
return definitions.OutputType
}

// DoOutput will perform its function on each collection placed into the channel.
func (o Output) DoOutput(ctx context.Context, ch <-chan events.Collection) error {
<-ctx.Done()
return nil
}
29 changes: 29 additions & 0 deletions pkg/resources/examples/noop/process.go
@@ -0,0 +1,29 @@
package noop

import (
"context"

"github.com/andrewneudegg/delta/pkg/events"
"github.com/andrewneudegg/delta/pkg/resources/definitions"
)

// Process is simple noop.
type Process struct {
}

// ID defines what this thing is.
func (p Process) ID() string {
return "resources/examples/simple"
}

// Type defines what type of resource this is.
func (p Process) Type() definitions.ResourceType {
return definitions.ProcessType
}

// DoProcess will perform its function, on each collection placed into the channel
// eventually passing a similar collection to the output.
func (p Process) DoProcess(ctx context.Context, ch1 <-chan events.Collection, ch2 chan<- events.Collection) error {
<-ctx.Done()
return nil
}

0 comments on commit ece9561

Please sign in to comment.