forked from zyxar/argo
/
http.go
124 lines (118 loc) · 2.71 KB
/
http.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
package argo
import (
"context"
"github.com/gorilla/websocket"
"log"
"net"
"net/http"
"net/url"
"sync"
"time"
)
type HttpCaller struct {
uri string
c *http.Client
cancel context.CancelFunc
wg *sync.WaitGroup
once sync.Once
}
func newHTTPCaller(ctx context.Context, u *url.URL, timeout time.Duration, notifer Notifier) *HttpCaller {
c := &http.Client{
Transport: &http.Transport{
MaxIdleConnsPerHost: 1,
MaxConnsPerHost: 1,
DialContext: (&net.Dialer{
Timeout: timeout,
KeepAlive: 60 * time.Second,
}).DialContext,
TLSHandshakeTimeout: 3 * time.Second,
ResponseHeaderTimeout: timeout,
},
}
var wg sync.WaitGroup
ctx, cancel := context.WithCancel(ctx)
h := &HttpCaller{uri: u.String(), c: c, cancel: cancel, wg: &wg}
if notifer != nil {
_ = h.setNotifier(ctx, *u, notifer)
}
return h
}
func (h *HttpCaller) Close() {
h.once.Do(func() {
h.cancel()
h.wg.Wait()
})
}
func (h *HttpCaller) setNotifier(ctx context.Context, u url.URL, notifer Notifier) (err error) {
u.Scheme = "ws"
conn, _, err := websocket.DefaultDialer.Dial(u.String(), nil)
if err != nil {
return
}
h.wg.Add(1)
go func() {
defer h.wg.Done()
defer func() { _ = conn.Close() }()
select {
case <-ctx.Done():
_ = conn.SetWriteDeadline(time.Now().Add(time.Second))
if err = conn.WriteMessage(websocket.CloseMessage,
websocket.FormatCloseMessage(websocket.CloseNormalClosure, "")); err != nil {
log.Printf("sending websocket close message: %v", err)
}
return
}
}()
h.wg.Add(1)
go func() {
defer h.wg.Done()
var request websocketResponse
//var err error
for {
select {
case <-ctx.Done():
return
default:
}
if err = conn.ReadJSON(&request); err != nil {
select {
case <-ctx.Done():
return
default:
}
log.Printf("conn.ReadJSON|err:%v", err.Error())
return
}
switch request.Method {
case onDownloadStart:
notifer.OnDownloadStart(request.Params)
case onDownloadPause:
notifer.OnDownloadPause(request.Params)
case onDownloadStop:
notifer.OnDownloadStop(request.Params)
case onDownloadComplete:
notifer.OnDownloadComplete(request.Params)
case onDownloadError:
notifer.OnDownloadError(request.Params)
case onBtDownloadComplete:
notifer.OnBtDownloadComplete(request.Params)
default:
log.Printf("unexpected notification: %s", request.Method)
}
}
}()
return
}
func (h *HttpCaller) Call(method string, params, reply interface{}) (err error) {
payload, err := EncodeClientRequest(method, params)
if err != nil {
return
}
r, err := h.c.Post(h.uri, "application/json", payload)
if err != nil {
return
}
err = DecodeClientResponse(r.Body, &reply)
_ = r.Body.Close()
return err
}