generated from alexejk/go-template
-
Notifications
You must be signed in to change notification settings - Fork 7
/
codec.go
163 lines (124 loc) · 3.22 KB
/
codec.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
package xmlrpc
import (
"bytes"
"errors"
"fmt"
"io/ioutil"
"net/http"
"net/rpc"
"net/url"
"sync"
)
const defaultUserAgent = "alexejk.io/go-xmlrpc"
// Codec implements methods required by rpc.ClientCodec
// In this implementation Codec is the one performing actual RPC requests with http.Client.
type Codec struct {
endpoint *url.URL
httpClient *http.Client
customHeaders map[string]string
mutex sync.Mutex
// contains completed but not processed responses by sequence ID
pending map[uint64]*rpcCall
// Current in-flight response
response *Response
encoder Encoder
decoder Decoder
// presents completed requests by sequence ID
ready chan uint64
userAgent string
}
type rpcCall struct {
Seq uint64
ServiceMethod string
httpResponse *http.Response
}
// NewCodec creates a new Codec bound to provided endpoint.
// Provided client will be used to perform RPC requests.
func NewCodec(endpoint *url.URL, httpClient *http.Client) *Codec {
return &Codec{
endpoint: endpoint,
httpClient: httpClient,
encoder: &StdEncoder{},
decoder: &StdDecoder{},
pending: make(map[uint64]*rpcCall),
response: nil,
ready: make(chan uint64),
userAgent: defaultUserAgent,
}
}
func (c *Codec) WriteRequest(req *rpc.Request, args interface{}) error {
bodyBuffer := new(bytes.Buffer)
err := c.encoder.Encode(bodyBuffer, req.ServiceMethod, args)
if err != nil {
return err
}
httpRequest, err := http.NewRequest("POST", c.endpoint.String(), bodyBuffer)
if err != nil {
return err
}
httpRequest.Header.Set("Content-Type", "text/xml")
httpRequest.Header.Set("User-Agent", c.userAgent)
// Apply customer headers if set, this allows overwriting static default headers
for key, value := range c.customHeaders {
httpRequest.Header.Set(key, value)
}
httpRequest.Header.Set("Content-Length", fmt.Sprintf("%d", bodyBuffer.Len()))
httpResponse, err := c.httpClient.Do(httpRequest)
if err != nil {
return err
}
c.mutex.Lock()
c.pending[req.Seq] = &rpcCall{
Seq: req.Seq,
ServiceMethod: req.ServiceMethod,
httpResponse: httpResponse,
}
c.mutex.Unlock()
c.ready <- req.Seq
return nil
}
func (c *Codec) ReadResponseHeader(resp *rpc.Response) error {
seq := <-c.ready
c.mutex.Lock()
call := c.pending[seq]
delete(c.pending, seq)
c.mutex.Unlock()
resp.Seq = call.Seq
resp.ServiceMethod = call.ServiceMethod
r := call.httpResponse
defer r.Body.Close()
if r.StatusCode < 200 || r.StatusCode >= 300 {
resp.Error = fmt.Sprintf("bad response code: %d", r.StatusCode)
return nil
}
body, err := ioutil.ReadAll(r.Body)
if err != nil {
resp.Error = err.Error()
return nil
}
decodableResponse, err := NewResponse(body)
if err != nil {
resp.Error = err.Error()
return nil
}
// Return response Fault already a this stage
if err := c.decoder.DecodeFault(decodableResponse); err != nil {
resp.Error = err.Error()
return nil
}
c.response = decodableResponse
return nil
}
func (c *Codec) ReadResponseBody(v interface{}) error {
if v == nil {
return nil
}
if c.response == nil {
return errors.New("no in-flight response found")
}
return c.decoder.Decode(c.response, v)
}
func (c *Codec) Close() error {
c.httpClient.CloseIdleConnections()
return nil
}