This repository has been archived by the owner on Jul 14, 2019. It is now read-only.
/
invite.go
131 lines (105 loc) · 2.27 KB
/
invite.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
package routes
import (
"context"
"io"
"net/http"
"sync"
"github.com/gorilla/mux"
"github.com/gorilla/websocket"
)
// NewInviteHandler returns a new instance of InviteHandler
func NewInviteHandler() *InviteHandler {
upgrader := &websocket.Upgrader{
CheckOrigin: func(r *http.Request) bool { return true },
}
connections := make(map[string]*pending)
return &InviteHandler{
upgrader: upgrader,
connections: connections,
}
}
type pending struct {
w http.ResponseWriter
r *http.Request
cancel context.CancelFunc
}
func newPending(w http.ResponseWriter, r *http.Request) *pending {
parentCtx := r.Context()
ctx, cancel := context.WithCancel(parentCtx)
return &pending{
w: w,
r: r.WithContext(ctx),
cancel: cancel,
}
}
// InviteHandler rendezvous client pairs at an invite URL
type InviteHandler struct {
sync.RWMutex
upgrader *websocket.Upgrader
connections map[string]*pending
}
func (h *InviteHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
key := mux.Vars(r)["key"]
if !websocket.IsWebSocketUpgrade(r) {
notFound(w)
return
}
h.RLock()
pending, ok := h.connections[key]
h.RUnlock()
if ok {
h.connect(w, r, pending)
return
}
pending = newPending(w, r)
h.Lock()
h.connections[key] = pending
h.Unlock()
<-pending.r.Context().Done()
h.Lock()
delete(h.connections, key)
h.Unlock()
}
// connect upgrades both (w, r) and (pending.w, pending.r) to websockets and
// connects them bidrectionally
func (h *InviteHandler) connect(w http.ResponseWriter, r *http.Request, pending *pending) {
src, err := h.upgrader.Upgrade(w, r, nil)
if err != nil {
return
}
dest, err := h.upgrader.Upgrade(pending.w, pending.r, nil)
if err != nil {
src.Close()
return
}
select {
case <-pipeWebsocket(src, dest):
case <-pipeWebsocket(dest, src):
}
pending.cancel()
src.Close()
dest.Close()
}
func pipeWebsocket(src, dest *websocket.Conn) <-chan interface{} {
done := make(chan interface{})
go func() {
for {
messageType, r, err := src.NextReader()
if err != nil {
break
}
w, err := dest.NextWriter(messageType)
if err != nil {
break
}
if _, err := io.Copy(w, r); err != nil {
break
}
if err := w.Close(); err != nil {
break
}
}
close(done)
}()
return done
}