-
Notifications
You must be signed in to change notification settings - Fork 0
/
invokeclient.go
117 lines (108 loc) · 2.79 KB
/
invokeclient.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
package signalr
import (
"errors"
"fmt"
"sync"
"time"
)
type invokeClient struct {
mx sync.Mutex
resultChans map[string]invocationResultChans
protocol hubProtocol
chanReceiveTimeout time.Duration
}
func newInvokeClient(protocol hubProtocol, chanReceiveTimeout time.Duration) *invokeClient {
return &invokeClient{
mx: sync.Mutex{},
resultChans: make(map[string]invocationResultChans),
protocol: protocol,
chanReceiveTimeout: chanReceiveTimeout,
}
}
type invocationResultChans struct {
resultChan chan interface{}
errChan chan error
}
func (i *invokeClient) newInvocation(id string) (chan interface{}, chan error) {
i.mx.Lock()
r := invocationResultChans{
resultChan: make(chan interface{}, 1),
errChan: make(chan error, 1),
}
i.resultChans[id] = r
i.mx.Unlock()
return r.resultChan, r.errChan
}
func (i *invokeClient) deleteInvocation(id string) {
i.mx.Lock()
if r, ok := i.resultChans[id]; ok {
delete(i.resultChans, id)
close(r.resultChan)
close(r.errChan)
}
i.mx.Unlock()
}
func (i *invokeClient) cancelAllInvokes() {
i.mx.Lock()
for _, r := range i.resultChans {
close(r.resultChan)
go func(errChan chan error) {
errChan <- errors.New("message loop ended")
close(errChan)
}(r.errChan)
}
// Clear map
i.resultChans = make(map[string]invocationResultChans)
i.mx.Unlock()
}
func (i *invokeClient) handlesInvocationID(invocationID string) bool {
i.mx.Lock()
_, ok := i.resultChans[invocationID]
i.mx.Unlock()
return ok
}
func (i *invokeClient) receiveCompletionItem(completion completionMessage) error {
defer i.deleteInvocation(completion.InvocationID)
i.mx.Lock()
ir, ok := i.resultChans[completion.InvocationID]
i.mx.Unlock()
if ok {
if completion.Error != "" {
done := make(chan struct{})
go func() {
ir.errChan <- errors.New(completion.Error)
done <- struct{}{}
}()
select {
case <-done:
return nil
case <-time.After(i.chanReceiveTimeout):
return &hubChanTimeoutError{fmt.Sprintf("timeout (%v) waiting for hub to receive client sent error", i.chanReceiveTimeout)}
}
}
if completion.Result != nil {
var result interface{}
if err := i.protocol.UnmarshalArgument(completion.Result, &result); err != nil {
return err
}
done := make(chan struct{})
go func() {
ir.resultChan <- result
if completion.Error != "" {
ir.errChan <- errors.New(completion.Error)
} else {
ir.errChan <- nil
}
close(done)
}()
select {
case <-done:
return nil
case <-time.After(i.chanReceiveTimeout):
return &hubChanTimeoutError{fmt.Sprintf("timeout (%v) waiting for hub to receive client sent value", i.chanReceiveTimeout)}
}
}
return nil
}
return fmt.Errorf(`unknown completion id "%v"`, completion.InvocationID)
}