forked from ipfs-cluster/ipfs-cluster
-
Notifications
You must be signed in to change notification settings - Fork 2
/
request.go
149 lines (128 loc) · 3.02 KB
/
request.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
package client
import (
"encoding/json"
"io"
"io/ioutil"
"net/http"
"strings"
"github.com/elastos/Elastos.NET.Hive.Cluster/api"
)
type responseDecoder func(d *json.Decoder) error
func (c *defaultClient) do(
method, path string,
headers map[string]string,
body io.Reader,
obj interface{},
) error {
resp, err := c.doRequest(method, path, headers, body)
if err != nil {
return &api.Error{Code: 0, Message: err.Error()}
}
return c.handleResponse(resp, obj)
}
func (c *defaultClient) doStream(
method, path string,
headers map[string]string,
body io.Reader,
outHandler responseDecoder,
) error {
resp, err := c.doRequest(method, path, headers, body)
if err != nil {
return &api.Error{Code: 0, Message: err.Error()}
}
return c.handleStreamResponse(resp, outHandler)
}
func (c *defaultClient) doRequest(
method, path string,
headers map[string]string,
body io.Reader,
) (*http.Response, error) {
urlpath := c.net + "://" + c.hostname + "/" + strings.TrimPrefix(path, "/")
logger.Debugf("%s: %s", method, urlpath)
r, err := http.NewRequest(method, urlpath, body)
if err != nil {
return nil, err
}
if c.config.DisableKeepAlives {
r.Close = true
}
if c.config.Username != "" {
r.SetBasicAuth(c.config.Username, c.config.Password)
}
if headers != nil {
for k, v := range headers {
r.Header.Set(k, v)
}
}
if body != nil {
r.ContentLength = -1 // this lets go use "chunked".
}
return c.client.Do(r)
}
func (c *defaultClient) handleResponse(resp *http.Response, obj interface{}) error {
body, err := ioutil.ReadAll(resp.Body)
resp.Body.Close()
if err != nil {
return &api.Error{Code: resp.StatusCode, Message: err.Error()}
}
logger.Debugf("Response body: %s", body)
switch {
case resp.StatusCode == http.StatusAccepted:
logger.Debug("Request accepted")
case resp.StatusCode == http.StatusNoContent:
logger.Debug("Request suceeded. Response has no content")
default:
if resp.StatusCode > 399 && resp.StatusCode < 600 {
var apiErr api.Error
err = json.Unmarshal(body, &apiErr)
if err != nil {
// not json. 404s etc.
return &api.Error{
Code: resp.StatusCode,
Message: string(body),
}
}
return &apiErr
}
err = json.Unmarshal(body, obj)
if err != nil {
return &api.Error{
Code: resp.StatusCode,
Message: err.Error(),
}
}
}
return nil
}
func (c *defaultClient) handleStreamResponse(resp *http.Response, handler responseDecoder) error {
if resp.StatusCode > 399 && resp.StatusCode < 600 {
return c.handleResponse(resp, nil)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
return &api.Error{
Code: resp.StatusCode,
Message: "expected streaming response with code 200",
}
}
dec := json.NewDecoder(resp.Body)
for {
err := handler(dec)
if err == io.EOF {
// we need to check trailers
break
}
if err != nil {
logger.Error(err)
return err
}
}
errTrailer := resp.Trailer.Get("X-Stream-Error")
if errTrailer != "" {
return &api.Error{
Code: 500,
Message: errTrailer,
}
}
return nil
}