Skip to content

Commit

Permalink
Modified proxy-go to use pion/webrtc
Browse files Browse the repository at this point in the history
The API is very similar, differences were mostly due to:
- closing peer connections and datachannels (no destroy/delete methods)
- different way to set datachannel/peer connection callbacks
- differences in whether functions take pointers or values
- no serialize/deserialize functions in the API
  • Loading branch information
cohosh committed Jun 14, 2019
1 parent cd650fa commit 8770de7
Showing 1 changed file with 84 additions and 35 deletions.
119 changes: 84 additions & 35 deletions proxy-go/snowflake.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"bytes"
"crypto/rand"
"encoding/base64"
"encoding/json"
"flag"
"fmt"
"io"
Expand All @@ -19,7 +20,7 @@ import (
"time"

"git.torproject.org/pluggable-transports/snowflake.git/common/safelog"
"github.com/keroserene/go-webrtc"
"github.com/pion/webrtc"
"golang.org/x/net/websocket"
)

Expand All @@ -43,7 +44,7 @@ const (

var (
tokens chan bool
config *webrtc.Configuration
config webrtc.Configuration
client http.Client
)

Expand Down Expand Up @@ -92,7 +93,7 @@ func (c *webRTCConn) Write(b []byte) (int, error) {

func (c *webRTCConn) Close() (err error) {
c.once.Do(func() {
err = c.pc.Destroy()
err = c.pc.Close()
})
return
}
Expand All @@ -103,7 +104,7 @@ func (c *webRTCConn) LocalAddr() net.Addr {

func (c *webRTCConn) RemoteAddr() net.Addr {
//Parse Remote SDP offer and extract client IP
clientIP := remoteIPFromSDP(c.pc.RemoteDescription().Sdp)
clientIP := remoteIPFromSDP(c.pc.RemoteDescription().SDP)
if clientIP == nil {
return nil
}
Expand Down Expand Up @@ -178,7 +179,7 @@ func pollOffer(sid string) *webrtc.SessionDescription {
if err != nil {
log.Printf("error reading broker response: %s", err)
} else {
return webrtc.DeserializeSessionDescription(string(body))
return deserializeSessionDescription(string(body))
}
}
}
Expand All @@ -187,7 +188,7 @@ func pollOffer(sid string) *webrtc.SessionDescription {

func sendAnswer(sid string, pc *webrtc.PeerConnection) error {
broker := brokerURL.ResolveReference(&url.URL{Path: "answer"})
body := bytes.NewBuffer([]byte(pc.LocalDescription().Serialize()))
body := bytes.NewBuffer([]byte(serializeSessionDescription(pc.LocalDescription())))
req, _ := http.NewRequest("POST", broker.String(), body)
req.Header.Set("X-Session-ID", sid)
resp, err := client.Do(req)
Expand Down Expand Up @@ -275,71 +276,63 @@ func datachannelHandler(conn *webRTCConn, remoteAddr net.Addr) {
// candidates is complete and the answer is available in LocalDescription.
// Installs an OnDataChannel callback that creates a webRTCConn and passes it to
// datachannelHandler.
func makePeerConnectionFromOffer(sdp *webrtc.SessionDescription, config *webrtc.Configuration, dataChan chan struct{}) (*webrtc.PeerConnection, error) {
func makePeerConnectionFromOffer(sdp *webrtc.SessionDescription, config webrtc.Configuration, dataChan chan struct{}) (*webrtc.PeerConnection, error) {
pc, err := webrtc.NewPeerConnection(config)
if err != nil {
return nil, fmt.Errorf("accept: NewPeerConnection: %s", err)
}
pc.OnNegotiationNeeded = func() {
panic("OnNegotiationNeeded")
}
pc.OnDataChannel = func(dc *webrtc.DataChannel) {
pc.OnDataChannel(func(dc *webrtc.DataChannel) {
log.Println("OnDataChannel")
close(dataChan)

pr, pw := io.Pipe()
conn := &webRTCConn{pc: pc, dc: dc, pr: pr}

dc.OnOpen = func() {
dc.OnOpen(func() {
log.Println("OnOpen channel")
}
dc.OnClose = func() {
})
dc.OnClose(func() {
conn.lock.Lock()
defer conn.lock.Unlock()
log.Println("OnClose channel")
conn.dc = nil
pc.DeleteDataChannel(dc)
dc.Close()
pw.Close()
}
dc.OnMessage = func(msg []byte) {
log.Printf("OnMessage <--- %d bytes", len(msg))
n, err := pw.Write(msg)
})
dc.OnMessage(func(msg webrtc.DataChannelMessage) {
log.Printf("OnMessage <--- %d bytes", len(msg.Data))
n, err := pw.Write(msg.Data)
if err != nil {
pw.CloseWithError(err)
}
if n != len(msg) {
if n != len(msg.Data) {
panic("short write")
}
}
})

go datachannelHandler(conn, conn.RemoteAddr())
}
})

err = pc.SetRemoteDescription(sdp)
err = pc.SetRemoteDescription(*sdp)
if err != nil {
pc.Destroy()
pc.Close()
return nil, fmt.Errorf("accept: SetRemoteDescription: %s", err)
}
log.Println("sdp offer successfully received.")

log.Println("Generating answer...")
answer, err := pc.CreateAnswer()
answer, err := pc.CreateAnswer(nil)
// blocks on ICE gathering. we need to add a timeout if needed
// not putting this in a separate go routine, because we need
// SetLocalDescription(answer) to be called before sendAnswer
if err != nil {
pc.Destroy()
pc.Close()
return nil, err
}

if answer == nil {
pc.Destroy()
return nil, fmt.Errorf("Failed gathering ICE candidates.")
}

err = pc.SetLocalDescription(answer)
if err != nil {
pc.Destroy()
pc.Close()
return nil, err
}

Expand All @@ -363,7 +356,7 @@ func runSession(sid string) {
err = sendAnswer(sid, pc)
if err != nil {
log.Printf("error sending answer to client through broker: %s", err)
pc.Destroy()
pc.Close()
retToken()
return
}
Expand All @@ -375,7 +368,7 @@ func runSession(sid string) {
log.Println("Connection successful.")
case <-time.After(dataChannelTimeout):
log.Println("Timed out waiting for client to open data channel.")
pc.Destroy()
pc.Close()
retToken()
}
}
Expand Down Expand Up @@ -422,7 +415,13 @@ func main() {
log.Fatalf("invalid relay url: %s", err)
}

config = webrtc.NewConfiguration(webrtc.OptionIceServer(stunURL))
config = webrtc.Configuration{
ICEServers: []webrtc.ICEServer{
{
URLs: []string{stunURL},
},
},
}
tokens = make(chan bool, capacity)
for i := uint(0); i < capacity; i++ {
tokens <- true
Expand All @@ -434,3 +433,53 @@ func main() {
runSession(sessionID)
}
}

func deserializeSessionDescription(msg string) *webrtc.SessionDescription {
var parsed map[string]interface{}
err := json.Unmarshal([]byte(msg), &parsed)
if nil != err {
log.Println(err)
return nil
}
if _, ok := parsed["type"]; !ok {
log.Println("Cannot deserialize SessionDescription without type field.")
return nil
}
if _, ok := parsed["sdp"]; !ok {
log.Println("Cannot deserialize SessionDescription without sdp field.")
return nil
}

var stype webrtc.SDPType
switch parsed["type"].(string) {
default:
log.Println("Unknown SDP type")
return nil
case "offer":
stype = webrtc.SDPTypeOffer
case "pranswer":
stype = webrtc.SDPTypePranswer
case "answer":
stype = webrtc.SDPTypeAnswer
case "rollback":
stype = webrtc.SDPTypeRollback
}

if err != nil {
log.Println(err)
return nil
}
return &webrtc.SessionDescription{
Type: stype,
SDP: parsed["sdp"].(string),
}
}

func serializeSessionDescription(desc *webrtc.SessionDescription) string {
bytes, err := json.Marshal(*desc)
if nil != err {
log.Println(err)
return ""
}
return string(bytes)
}

0 comments on commit 8770de7

Please sign in to comment.