This repository has been archived by the owner on May 28, 2024. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 2
/
client.go
129 lines (113 loc) · 3.64 KB
/
client.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
package findy
import (
"context"
"errors"
"io"
"github.com/findy-network/findy-agent-vault/agency/model"
"github.com/findy-network/findy-agent-vault/utils"
"github.com/findy-network/findy-common-go/agency/client"
"github.com/findy-network/findy-common-go/agency/client/async"
agency "github.com/findy-network/findy-common-go/grpc/agency/v1"
ops "github.com/findy-network/findy-common-go/grpc/ops/v1"
"github.com/findy-network/findy-common-go/jwt"
"github.com/findy-network/findy-common-go/rpc"
"github.com/golang/glog"
"github.com/lainio/err2"
"github.com/lainio/err2/try"
"golang.org/x/oauth2"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/oauth"
)
type clientConn interface {
release(id string, protocolType agency.Protocol_Type) (pid *agency.ProtocolID, err error)
status(id string, protocolType agency.Protocol_Type) (pid *agency.ProtocolStatus, err error)
listen(id string) (ch chan *AgentStatus, err error)
psmHook() (ch chan *ops.AgencyStatus, err error)
}
type Client struct {
*client.Conn
ctx context.Context
cOpts []grpc.CallOption
}
func (f *Agency) callOptions(jwtToken string) []grpc.CallOption {
// Bypass security measures in insecure mode
if f.agencyInsecure {
conf := &rpc.ClientCfg{
JWT: jwtToken,
Addr: f.connConfig.Addr,
Opts: f.connConfig.Opts,
Insecure: f.connConfig.Insecure,
}
return []grpc.CallOption{
grpc.PerRPCCredentials(conf),
}
}
return []grpc.CallOption{
grpc.PerRPCCredentials(
oauth.TokenSource{
TokenSource: oauth2.StaticTokenSource(jwt.OauthToken(jwtToken)),
},
),
}
}
// Connection configuration for "sync" requests coming directly from web wallet
func (f *Agency) userSyncClient(a *model.Agent, connectionID string) *async.Pairwise {
opts := f.callOptions(a.RawJWT)
return async.NewPairwise(f.conn, connectionID, opts...)
}
// Connection configuration for "async" requests, done on behalf of the web wallet
func (f *Agency) getUserAsyncClient(a *model.Agent) clientConn {
opts := f.callOptions(jwt.BuildJWT(a.AgentID))
return &Client{&f.conn, f.ctx, opts}
}
// Connection configuration for agency administrative client
func (f *Agency) adminClient() *Client {
opts := f.callOptions(jwt.BuildJWT(f.agencyAdminID))
return &Client{&f.conn, f.ctx, opts}
}
func (c *Client) release(id string, protocolType agency.Protocol_Type) (pid *agency.ProtocolID, err error) {
protocolID := &agency.ProtocolID{
ID: id,
TypeID: protocolType,
}
return c.Conn.DoRelease(c.ctx, protocolID, c.cOpts...)
}
func (c *Client) status(id string, protocolType agency.Protocol_Type) (pid *agency.ProtocolStatus, err error) {
protocolID := &agency.ProtocolID{
ID: id,
TypeID: protocolType,
}
return c.Conn.DoStatus(c.ctx, protocolID, c.cOpts...)
}
type AgentStatus struct {
status *agency.AgentStatus
err error
}
func (c *Client) listen(id string) (ch chan *AgentStatus, err error) {
clientID := &agency.ClientID{ID: id}
defer err2.Handle(&err)
agentClient := agency.NewAgentServiceClient(c.ClientConn)
statusCh := make(chan *AgentStatus)
stream := try.To1(agentClient.Listen(c.ctx, clientID, c.cOpts...))
utils.LogLow().Infoln("successful start of listen id:", clientID.ID)
go func() {
defer err2.Catch(err2.Err(func(err error) {
glog.Warningln("WARNING: error when reading response:", err)
close(statusCh)
}))
for {
status, err := stream.Recv()
if errors.Is(err, io.EOF) {
glog.Warningln("status stream end")
close(statusCh)
break
}
statusCh <- &AgentStatus{status, err}
try.To(err)
}
}()
return statusCh, nil
}
func (c *Client) psmHook() (ch chan *ops.AgencyStatus, err error) {
return c.Conn.PSMHook(c.ctx, c.cOpts...)
}