/
state.go
133 lines (115 loc) · 3 KB
/
state.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
package session
import (
"fmt"
"io"
"time"
"github.com/pions/webrtc"
)
func (s *Session) setStateManager() {
// Set the handler for ICE connection state
// This will notify you when the peer has connected/disconnected
s.peerConnection.OnICEConnectionStateChange(func(connectionState webrtc.ICEConnectionState) {
fmt.Printf("ICE Connection State has changed: %s\n", connectionState.String())
fmt.Printf("Connection state is %v\n", s.peerConnection.ConnectionState)
if connectionState == webrtc.ICEConnectionStateDisconnected {
s.stopSending <- struct{}{}
}
})
}
func (s *Session) writeToNetwork() {
fmt.Println("Starting to send data...")
defer fmt.Println("Stopped sending data...")
for {
SELECT:
select {
case <-s.stopSending:
fmt.Printf("Pausing network I/O... (remaining at least %v packets)", len(s.output))
return
case data := <-s.output:
if data.n == 0 {
// The channel is closed, nothing more to send
s.close(false)
return
}
s.msgToBeSent = append(s.msgToBeSent, data)
for len(s.msgToBeSent) != 0 {
cur := s.msgToBeSent[0]
// TODO: Correct check
if s.dataChannel.ReadyState != webrtc.DataChannelStateOpen {
fmt.Printf("Status: %v, dropping %v bytes\n", s.dataChannel.ReadyState, data.n)
break SELECT
}
// Writing packet
if err := s.dataChannel.Send(cur.buff); err != nil {
fmt.Printf("Error, cannot send to client: %v\n", err)
return
}
s.nbBytesSent += uint64(cur.n)
s.msgToBeSent = s.msgToBeSent[1:]
}
}
}
}
func (s *Session) readFile() {
fmt.Println("Starting to read data...")
defer func() {
fmt.Println("Stopped reading data...")
close(s.output)
}()
for {
// Read file
s.dataBuff = s.dataBuff[:cap(s.dataBuff)]
n, err := s.stream.Read(s.dataBuff)
if err != nil {
if err == io.EOF {
fmt.Printf("Got EOF after %v bytes!\n", s.nbBytesRead)
return
}
fmt.Printf("Read Error: %v\n", err)
return
}
s.dataBuff = s.dataBuff[:n]
s.nbBytesRead += uint64(n)
s.output <- outputMsg{
n: n,
// Make a copy of the buffer
buff: append([]byte(nil), s.dataBuff...),
}
}
}
func (s *Session) onOpenHandler() func() {
return func() {
if s.timeStart.IsZero() {
s.timeStart = time.Now()
}
s.writeToNetwork()
}
}
func (s *Session) dumpStats() {
duration := time.Since(s.timeStart)
speedMb := (float64(s.nbBytesSent) / 1024 / 1024) / duration.Seconds()
fmt.Printf("Bytes read: %v\n", s.nbBytesRead)
fmt.Printf("Bytes sent: %v\n", s.nbBytesSent)
fmt.Printf("Duration: %v\n", duration.String())
fmt.Printf("Speed: %.04f MB/s\n", speedMb)
}
func (s *Session) close(calledFromCloseHandler bool) {
if calledFromCloseHandler == false {
s.dataChannel.Close()
}
// Sometime, onCloseHandler is not invoked, so it's a work-around
s.doneCheckLock.Lock()
if s.doneCheck == true {
s.doneCheckLock.Unlock()
return
}
s.doneCheck = true
s.doneCheckLock.Unlock()
s.dumpStats()
close(s.done)
}
func (s *Session) onCloseHandler() func() {
return func() {
s.close(true)
}
}