This repository has been archived by the owner on Aug 2, 2021. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 0
/
channel.go
115 lines (96 loc) · 2.53 KB
/
channel.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
package gomavlib
import (
"io"
)
// Channel is a communication channel created by an endpoint. For instance, a
// TCP client endpoint creates a single channel, while a TCP server endpoint
// creates a channel for each incoming connection.
type Channel struct {
// the endpoint which the channel belongs to
Endpoint Endpoint
label string
rwc io.ReadWriteCloser
n *Node
parser *Parser
writeChan chan interface{}
allWritten chan struct{}
}
func newChannel(n *Node, e Endpoint, label string, rwc io.ReadWriteCloser) *Channel {
parser, _ := NewParser(ParserConf{
Reader: rwc,
Writer: rwc,
Dialect: n.conf.Dialect,
InKey: n.conf.InKey,
OutSystemId: n.conf.OutSystemId,
OutComponentId: n.conf.OutComponentId,
OutSignatureLinkId: randomByte(),
OutKey: n.conf.OutKey,
})
ch := &Channel{
Endpoint: e,
label: label,
rwc: rwc,
n: n,
parser: parser,
writeChan: make(chan interface{}),
allWritten: make(chan struct{}),
}
return ch
}
// String implements fmt.Stringer and returns the channel label.
func (ch *Channel) String() string {
return ch.label
}
func (ch *Channel) close() {
// wait until all frame have been written
close(ch.writeChan)
<-ch.allWritten
// close reader/writer after ensuring all frames have been written
ch.rwc.Close()
}
func (ch *Channel) run() {
// reader
readerDone := make(chan struct{})
go func() {
defer func() { readerDone <- struct{}{} }()
defer func() { ch.n.eventsOut <- &EventChannelClose{ch} }()
defer func() { ch.n.eventsIn <- &eventInChannelClosed{ch} }()
ch.n.eventsOut <- &EventChannelOpen{ch}
for {
frame, err := ch.parser.Read()
if err != nil {
// continue in case of parse errors
if _, ok := err.(*ParserError); ok {
ch.n.eventsOut <- &EventParseError{err, ch}
continue
}
return
}
evt := &EventFrame{frame, ch}
if ch.n.nodeStreamRequest != nil {
ch.n.nodeStreamRequest.onEventFrame(evt)
}
ch.n.eventsOut <- evt
}
}()
// writer
writerDone := make(chan struct{})
go func() {
defer func() { writerDone <- struct{}{} }()
defer func() { ch.allWritten <- struct{}{} }()
for what := range ch.writeChan {
switch wh := what.(type) {
case Message:
if ch.n.conf.OutVersion == V1 {
ch.parser.Write(&FrameV1{Message: wh}, false)
} else {
ch.parser.Write(&FrameV2{Message: wh}, false)
}
case Frame:
ch.parser.Write(wh, true)
}
}
}()
<-readerDone
<-writerDone
}