/
outputstream.go
124 lines (98 loc) · 2.44 KB
/
outputstream.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
// Copyright ©2017 The go-hep Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package fwk
import (
"reflect"
)
// OutputStream implements a task writing data to an OutputStreamer.
//
// OutputStream is concurrent-safe.
//
// OutputStream declares a property 'Ports', a []fwk.Port, which will
// be used to declare the input ports the task will access to,
// writing out data via the underlying OutputStreamer.
//
// OutputStream declares a property 'Streamer', a fwk.OutputStreamer,
// which will be used to actually write data to.
type OutputStream struct {
TaskBase
streamer OutputStreamer
ctrl StreamControl
}
// Configure declares the input ports defined by the 'Ports' property.
func (tsk *OutputStream) Configure(ctx Context) error {
var err error
for _, port := range tsk.ctrl.Ports {
err = tsk.DeclInPort(port.Name, port.Type)
if err != nil {
return err
}
}
return err
}
// StartTask starts the OutputStreamer task
func (tsk *OutputStream) StartTask(ctx Context) error {
return nil
}
// StopTask stops the OutputStreamer task
func (tsk *OutputStream) StopTask(ctx Context) error {
return tsk.disconnect()
}
func (tsk *OutputStream) connect(ctrl StreamControl) error {
ctrl.Ports = make([]Port, len(tsk.ctrl.Ports))
copy(ctrl.Ports, tsk.ctrl.Ports)
tsk.ctrl = ctrl
err := tsk.streamer.Connect(ctrl.Ports)
if err != nil {
return err
}
go tsk.write()
return err
}
func (tsk *OutputStream) disconnect() error {
return tsk.streamer.Disconnect()
}
func (tsk *OutputStream) write() {
for {
select {
case ctx := <-tsk.ctrl.Ctx:
tsk.ctrl.Err <- tsk.streamer.Write(ctx)
case <-tsk.ctrl.Quit:
return
}
}
}
// Process gets data from the store and
// writes it out via the underlying OutputStreamer
func (tsk *OutputStream) Process(ctx Context) error {
var err error
tsk.ctrl.Ctx <- ctx
err = <-tsk.ctrl.Err
if err != nil {
return err
}
return err
}
func newOutputStream(typ, name string, mgr App) (Component, error) {
var err error
tsk := &OutputStream{
TaskBase: NewTask(typ, name, mgr),
streamer: nil,
ctrl: StreamControl{
Ports: make([]Port, 0),
},
}
err = tsk.DeclProp("Ports", &tsk.ctrl.Ports)
if err != nil {
return nil, err
}
err = tsk.DeclProp("Streamer", &tsk.streamer)
if err != nil {
return nil, err
}
return tsk, err
}
func init() {
Register(reflect.TypeOf(OutputStream{}), newOutputStream)
}