forked from quickfixgo/quickfix
-
Notifications
You must be signed in to change notification settings - Fork 0
/
registry.go
143 lines (112 loc) · 3.21 KB
/
registry.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
package quickfix
import (
"fmt"
)
//Marshaler Marshals self to quickfix.Message type
type Marshaler interface {
Marshal() Message
}
//Send determines the session to send Marshaler using header fields BeginString, TargetCompID, SenderCompID
func Send(m Marshaler) (err error) {
msg := m.Marshal()
var beginString FIXString
if err := msg.Header.GetField(tagBeginString, &beginString); err != nil {
return err
}
var targetCompID FIXString
if err := msg.Header.GetField(tagTargetCompID, &targetCompID); err != nil {
return err
}
var senderCompID FIXString
if err := msg.Header.GetField(tagSenderCompID, &senderCompID); err != nil {
return nil
}
sessionID := SessionID{BeginString: string(beginString), TargetCompID: string(targetCompID), SenderCompID: string(senderCompID)}
return SendToTarget(msg, sessionID)
}
//SendToTarget sends a message based on the sessionID. Convenient for use in FromApp since it provides a session ID for incoming messages
func SendToTarget(m Marshaler, sessionID SessionID) error {
msg := m.Marshal()
session, err := lookupSession(sessionID)
if err != nil {
return err
}
session.send(msg)
return nil
}
type sessionActivate struct {
SessionID
reply chan *session
}
type sessionResource struct {
session *session
active bool
}
type sessionLookupResponse struct {
session *session
err error
}
type sessionLookup struct {
SessionID
reply chan sessionLookupResponse
}
type registry struct {
newSession chan *session
activate chan sessionActivate
deactivate chan SessionID
lookup chan sessionLookup
}
var sessions *registry
func init() {
sessions = new(registry)
sessions.newSession = make(chan *session)
sessions.activate = make(chan sessionActivate)
sessions.deactivate = make(chan SessionID)
sessions.lookup = make(chan sessionLookup)
go sessions.sessionResourceServerLoop()
}
func activate(sessionID SessionID) *session {
response := make(chan *session)
sessions.activate <- sessionActivate{sessionID, response}
return <-response
}
func deactivate(sessionID SessionID) {
sessions.deactivate <- sessionID
}
//lookupSession returns the Session associated with the sessionID.
func lookupSession(sessionID SessionID) (*session, error) {
responseChannel := make(chan sessionLookupResponse)
sessions.lookup <- sessionLookup{sessionID, responseChannel}
response := <-responseChannel
return response.session, response.err
}
func (r *registry) sessionResourceServerLoop() {
sessions := make(map[SessionID]*sessionResource)
for {
select {
case session := <-r.newSession:
sessions[session.sessionID] = &sessionResource{session, false}
case deactivatedID := <-r.deactivate:
if resource, ok := sessions[deactivatedID]; ok {
resource.active = false
}
case lookup := <-r.lookup:
if resource, ok := sessions[lookup.SessionID]; ok {
lookup.reply <- sessionLookupResponse{resource.session, nil}
} else {
lookup.reply <- sessionLookupResponse{nil, fmt.Errorf("session not found")}
}
case request := <-r.activate:
resource, ok := sessions[request.SessionID]
switch {
case !ok:
request.reply <- nil
case resource.active:
request.reply <- nil
default:
resource.active = true
request.reply <- resource.session
}
}
}
}