forked from project-flogo/stream
/
definition.go
69 lines (55 loc) · 1.59 KB
/
definition.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
package pipeline
import (
"github.com/project-flogo/core/activity"
"github.com/project-flogo/core/data/mapper"
"github.com/project-flogo/core/data/metadata"
"github.com/project-flogo/core/data/resolve"
"github.com/project-flogo/core/support"
"github.com/project-flogo/core/support/log"
)
type DefinitionConfig struct {
id string
Name string `json:"name"`
Metadata *metadata.IOMetadata `json:"metadata"`
Stages []*StageConfig `json:"stages"`
}
func NewDefinition(config *DefinitionConfig, mf mapper.Factory, resolver resolve.CompositeResolver) (*Definition, error) {
def := &Definition{id: config.id, name: config.Name, metadata: config.Metadata}
for _, sconfig := range config.Stages {
stage, err := NewStage(sconfig, mf, resolver)
if err != nil {
return nil, err
}
def.stages = append(def.stages, stage)
}
return def, nil
}
type Definition struct {
id string
name string
stages []*Stage
metadata *metadata.IOMetadata
}
// Metadata returns IO metadata for the pipeline
func (d *Definition) Metadata() *metadata.IOMetadata {
return d.metadata
}
func (d *Definition) Id() string {
return d.id
}
func (d *Definition) Name() string {
return d.name
}
func (d *Definition) Cleanup() error {
for _, stage := range d.stages {
if !activity.IsSingleton(stage.act) {
if needsCleanup, ok := stage.act.(support.NeedsCleanup); ok {
err := needsCleanup.Cleanup()
if err != nil {
log.RootLogger().Warnf("Error cleaning up activity '%s' in stream pipeline '%s' : ", activity.GetRef(stage.act), d.name, err)
}
}
}
}
return nil
}