-
Notifications
You must be signed in to change notification settings - Fork 0
/
user.go
106 lines (91 loc) · 3.42 KB
/
user.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
// user.go provides a facility for proxying user-generated bytestreams - ie, streams which
// originate from a process running on the user's computer (like a web browser) or streams which
// are generated as part of the user's Lantern control plane activity. We can operationalize a
// user stream in a WorkerFSM just like we do an RTCPeerConnection or a websocket.Conn, such that
// bytestreams from the user can be neatly managed alongside bytestreams from remote peers in
// their consumer table. In other words: we treat the user's web browser just like any connected consumer.
package clientcore
import (
"context"
"net"
"sync"
"time"
"github.com/google/uuid"
"github.com/getlantern/broflake/common"
)
type BroflakeConn struct {
net.PacketConn
writeChan chan IPCMsg
readChan chan IPCMsg
addr common.DebugAddr
readDeadline time.Time
updateReadDeadline chan time.Time
}
func (c BroflakeConn) LocalAddr() net.Addr {
return c.addr
}
func (c BroflakeConn) ReadFrom(p []byte) (n int, addr net.Addr, err error) {
for {
var ctx context.Context
// If the deadline is zero value, never expire; otherwise obey the deadline
if c.readDeadline.IsZero() {
ctx, _ = context.WithCancel(context.Background())
} else {
ctx, _ = context.WithDeadline(context.Background(), c.readDeadline)
}
select {
case msg := <-c.readChan:
// The read completed, let's return some bytes!
payload := msg.Data.([]byte)
copy(p, payload)
return len(payload), common.DebugAddr("DEBUG NELSON WUZ HERE"), nil
case <-ctx.Done():
// We're past our deadline, so let's return failure!
return 0, common.DebugAddr("DEBUG NELSON WUZ HERE"), ctx.Err()
case d := <-c.updateReadDeadline:
// Someone updated the read deadline, so let's iterate to respect the new deadline
c.readDeadline = d
}
}
}
func (c BroflakeConn) WriteTo(p []byte, addr net.Addr) (n int, err error) {
// TODO: This copy seems necessary to avoid a data race
b := make([]byte, len(p))
copy(b, p)
select {
case c.writeChan <- IPCMsg{IpcType: ChunkIPC, Data: b}:
// Do nothing, message sent
default:
// Drop the chunk if we can't keep up with the data rate
}
return len(b), nil
}
// XXX: A note about deadlines: as of quic-go 0.34, the QUIC dialer didn't seem to care about read
// or write deadlines, and it was happy to use a net.PacketConn which didn't properly implement them.
// But when we bumped to quic-go 0.40, it emerged that the dialer wouldn't work unless we added
// support for read deadlines. Since there's still no evidence that the dialer cares about write
// deadlines, we haven't added support for those yet.
func (c BroflakeConn) SetReadDeadline(t time.Time) error {
c.updateReadDeadline <- t
return nil
}
func NewProducerUserStream(wg *sync.WaitGroup) (*BroflakeConn, *WorkerFSM) {
worker := NewWorkerFSM(wg, []FSMstate{
FSMstate(func(ctx context.Context, com *ipcChan, input []interface{}) (int, []interface{}) {
// State 0
// (no input data)
common.Debugf("User stream producer state 0...")
// TODO: check for a non-nil path assertion to alert the UI that we're ready to proxy?
select {}
}),
})
bfconn := BroflakeConn{
PacketConn: &net.UDPConn{},
writeChan: worker.com.tx,
readChan: worker.com.rx,
addr: common.DebugAddr(uuid.NewString()),
readDeadline: time.Time{},
updateReadDeadline: make(chan time.Time, 512),
}
return &bfconn, worker
}