forked from TIBCOSoftware/flogo-contrib
/
activity.go
executable file
·101 lines (72 loc) · 2.49 KB
/
activity.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
package aggregate_old
import (
"errors"
"sync"
"github.com/TIBCOSoftware/flogo-contrib/activity/aggregate_old/aggregator"
"github.com/TIBCOSoftware/flogo-lib/core/activity"
"github.com/TIBCOSoftware/flogo-lib/core/data"
"github.com/TIBCOSoftware/flogo-lib/logger"
)
// activityLogger is the default logger for the Aggregate Activity
var activityLogger = logger.GetLogger("activity-aggregate_old")
const (
ivFunction = "function"
ivWindowSize = "windowSize"
ivValue = "value"
ovResult = "result"
ovReport = "report"
)
func init() {
activityLogger.SetLogLevel(logger.InfoLevel)
}
// AggregateActivity is an Activity that is used to Aggregate a message to the console
// inputs : {function, windowSize, autoRest, value}
// outputs: {result, report}
type AggregateActivity struct {
metadata *activity.Metadata
mutex *sync.RWMutex
// aggregators stateful map of aggregators
aggregators map[string]aggregator.Aggregator
}
// NewActivity creates a new AppActivity
func NewActivity(metadata *activity.Metadata) activity.Activity {
return &AggregateActivity{metadata: metadata, aggregators: make(map[string]aggregator.Aggregator), mutex: &sync.RWMutex{}}
}
// Metadata returns the activity's metadata
func (a *AggregateActivity) Metadata() *activity.Metadata {
return a.metadata
}
// Eval implements api.Activity.Eval - Aggregates the Message
func (a *AggregateActivity) Eval(context activity.Context) (done bool, err error) {
aggregatorKey := context.ActivityHost().Name() + ":" + context.Name()
a.mutex.RLock()
//get aggregator for activity, assumes flow & task names are unique
aggr, ok := a.aggregators[aggregatorKey]
a.mutex.RUnlock()
//if window not create for this flow, create it
if !ok {
//go doesn't have lock upgrades or try, so do same check again
a.mutex.Lock()
aggr, ok = a.aggregators[aggregatorKey]
if !ok {
windowSize, _ := context.GetInput(ivWindowSize).(int)
aggrName, _ := context.GetInput(ivFunction).(string)
factory := aggregator.GetFactory(aggrName)
if factory == nil {
return false, errors.New("Unknown aggregator: " + aggrName)
}
aggr = factory(windowSize)
a.aggregators[aggregatorKey] = aggr
activityLogger.Debug("Aggregator created for ", aggregatorKey)
}
a.mutex.Unlock()
}
value, ok := context.GetInput(ivValue).(float64)
if !ok {
value, _ = data.CoerceToNumber(context.GetInput(ivValue))
}
report, result := aggr.Add(value)
context.SetOutput(ovReport, report)
context.SetOutput(ovResult, result)
return true, nil
}