forked from daos-stack/daos
-
Notifications
You must be signed in to change notification settings - Fork 0
/
drpc_client.go
168 lines (141 loc) · 4.34 KB
/
drpc_client.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
//
// (C) Copyright 2019 Intel Corporation.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// GOVERNMENT LICENSE RIGHTS-OPEN SOURCE SOFTWARE
// The Government's rights to use, modify, reproduce, release, perform, display,
// or disclose this software are subject to the terms of the Apache License as
// provided in Contract No. 8F-30005.
// Any reproduction of computer software, computer software documentation, or
// portions thereof marked with this legend must also reproduce the markings.
//
package drpc
import (
"net"
"sync"
"github.com/golang/protobuf/proto"
"github.com/pkg/errors"
)
// DomainSocketClient is the interface to a dRPC client communicating over a
// Unix Domain Socket
type DomainSocketClient interface {
sync.Locker
IsConnected() bool
Connect() error
Close() error
SendMsg(call *Call) (*Response, error)
}
// domainSocketDialer is an interface that connects to a Unix Domain Socket
type domainSocketDialer interface {
dial(socketPath string) (net.Conn, error)
}
// ClientConnection represents a client connection to a dRPC server
type ClientConnection struct {
sync.Mutex
socketPath string // Filesystem location of dRPC socket
dialer domainSocketDialer // Interface to connect to the socket
conn net.Conn // Connection to socket
sequence int64 // Increment each time we send
}
// IsConnected indicates whether the client connection is currently active
func (c *ClientConnection) IsConnected() bool {
return c.conn != nil
}
// Connect opens a connection to the internal Unix Domain Socket path
func (c *ClientConnection) Connect() error {
if c.IsConnected() {
// Nothing to do
return nil
}
conn, err := c.dialer.dial(c.socketPath)
if err != nil {
return errors.Wrap(err, "dRPC connect")
}
c.conn = conn
c.sequence = 0 // reset message sequence number on connect
return nil
}
// Close shuts down the connection to the Unix Domain Socket
func (c *ClientConnection) Close() error {
if !c.IsConnected() {
// Nothing to do
return nil
}
err := c.conn.Close()
if err != nil {
return errors.Wrap(err, "dRPC close")
}
c.conn = nil
return nil
}
func (c *ClientConnection) sendCall(msg *Call) error {
// increment sequence every call, always nonzero
c.sequence++
msg.Sequence = c.sequence
callBytes, err := proto.Marshal(msg)
if err != nil {
return errors.Wrap(err, "failed to marshal dRPC request")
}
if _, err := c.conn.Write(callBytes); err != nil {
return errors.Wrap(err, "dRPC send")
}
return nil
}
func (c *ClientConnection) recvResponse() (*Response, error) {
respBytes := make([]byte, MaxMsgSize)
numBytes, err := c.conn.Read(respBytes)
if err != nil {
return nil, errors.Wrap(err, "dRPC recv")
}
resp := &Response{}
err = proto.Unmarshal(respBytes[:numBytes], resp)
if err != nil {
return nil, errors.Wrap(err, "failed to unmarshal dRPC response")
}
return resp, nil
}
// SendMsg sends a message to the connected dRPC server, and returns the
// response to the caller.
func (c *ClientConnection) SendMsg(msg *Call) (*Response, error) {
if !c.IsConnected() {
return nil, errors.Errorf("dRPC not connected")
}
if msg == nil {
return nil, errors.Errorf("invalid dRPC call")
}
err := c.sendCall(msg)
if err != nil {
return nil, errors.WithStack(err)
}
return c.recvResponse()
}
// NewClientConnection creates a new dRPC client
func NewClientConnection(socket string) *ClientConnection {
return &ClientConnection{
socketPath: socket,
dialer: &clientDialer{},
}
}
// clientDialer is the concrete implementation of the domainSocketDialer
// interface for dRPC clients
type clientDialer struct {
}
// dial connects to the real unix domain socket located at socketPath
func (c *clientDialer) dial(socketPath string) (net.Conn, error) {
addr := &net.UnixAddr{
Net: "unixpacket",
Name: socketPath,
}
return net.DialUnix("unixpacket", nil, addr)
}