Skip to content
Permalink

Comparing changes

Choose two branches to see what’s changed or to start a new pull request. If you need to, you can also .

Open a pull request

Create a new pull request by comparing changes across two branches. If you need to, you can also .
base repository: cohosh/snowflake
base: master
head repository: cohosh/snowflake
compare: ticket/32938
Checking mergeability… Don’t worry, you can still create the pull request.
  • 8 commits
  • 3 files changed
  • 0 comments
  • 1 contributor
Commits on Feb 27, 2020
This will contact a (for now hardcoded) probe point to perform a
throughput test of the proxy before it polls the broker. For now we just
perform a poll of the probe point to receive their WebRTC offer.
Commits on Mar 02, 2020
Send an SDP answer to the remote probe point and recieve the test
results. For now we just log results.
This refactors makePeerConnectionFromOffer to take a datachannel handler
as an argument. This will allow us to reuse this function when
conducting the throughput test.
Showing with 184 additions and 54 deletions.
  1. +2 −2 proxy-go/proxy-go_test.go
  2. +139 −52 proxy-go/snowflake.go
  3. +43 −0 proxy-go/throughput.go
@@ -227,7 +227,7 @@ func TestBrokerInteractions(t *testing.T) {
const sampleAnswer = `{"type":"answer","sdp":` + sampleSDP + `}`

Convey("Proxy connections to broker", t, func() {
broker := new(Broker)
broker := new(Remote)
broker.url, _ = url.Parse("localhost")

//Mock peerConnection
@@ -307,7 +307,7 @@ func TestBrokerInteractions(t *testing.T) {
}
err = broker.sendAnswer("test", pc)
So(err, ShouldNotEqual, nil)
So(err.Error(), ShouldResemble, "broker returned 410")
So(err.Error(), ShouldResemble, "error sending answer to broker: remote returned status code 410")

//Error if we can't parse broker message
broker.transport = &MockTransport{
@@ -28,6 +28,7 @@ import (

const defaultBrokerURL = "https://snowflake-broker.bamsoftware.com/"
const defaultRelayURL = "wss://snowflake.bamsoftware.com/"
const defaultProbeURL = "http://159.203.63.110:8080"
const defaultSTUNURL = "stun:stun.l.google.com:19302"
const pollInterval = 5 * time.Second

@@ -37,7 +38,7 @@ const dataChannelTimeout = 20 * time.Second

const readLimit = 100000 //Maximum number of bytes to be read from an HTTP request

var broker *Broker
var broker *Remote
var relayURL string

const (
@@ -69,11 +70,6 @@ func remoteIPFromSDP(sdp string) net.IP {
return nil
}

type Broker struct {
url *url.URL
transport http.RoundTripper
}

type webRTCConn struct {
dc *webrtc.DataChannel
pc *webrtc.PeerConnection
@@ -158,8 +154,32 @@ func limitedRead(r io.Reader, limit int64) ([]byte, error) {
return p, err
}

func (b *Broker) pollOffer(sid string) *webrtc.SessionDescription {
brokerPath := b.url.ResolveReference(&url.URL{Path: "proxy"})
type Remote struct {
url *url.URL
transport http.RoundTripper
}

func (r *Remote) MakePost(path string, payload io.Reader) ([]byte, error) {

req, err := http.NewRequest("POST", path, payload)
if err != nil {
return nil, err
}
resp, err := r.transport.RoundTrip(req)
if err != nil {
return nil, err
}

if resp.StatusCode != http.StatusOK {
return nil, fmt.Errorf("remote returned status code %d", resp.StatusCode)
}

defer resp.Body.Close()
return limitedRead(resp.Body, readLimit)
}

func (r *Remote) pollOffer(sid string) *webrtc.SessionDescription {
brokerPath := r.url.ResolveReference(&url.URL{Path: "proxy"})
timeOfNextPoll := time.Now()
for {
// Sleep until we're scheduled to poll again.
@@ -178,56 +198,36 @@ func (b *Broker) pollOffer(sid string) *webrtc.SessionDescription {
log.Printf("Error encoding poll message: %s", err.Error())
return nil
}
req, _ := http.NewRequest("POST", brokerPath.String(), bytes.NewBuffer(body))
resp, err := b.transport.RoundTrip(req)
response, err := r.MakePost(brokerPath.String(), bytes.NewBuffer(body))
if err != nil {
log.Printf("error polling broker: %s", err)
} else {
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
log.Printf("broker returns: %d", resp.StatusCode)
} else {
body, err := limitedRead(resp.Body, readLimit)
if err != nil {
log.Printf("error reading broker response: %s", err)
} else {

offer, err := messages.DecodePollResponse(body)
if err != nil {
log.Printf("error reading broker response: %s", err.Error())
log.Printf("body: %s", body)
return nil
}
if offer != "" {
return deserializeSessionDescription(offer)
}
}
}
log.Printf("error polling broker: %s", err.Error())
return nil
}
offer, err := messages.DecodePollResponse(response)
if err != nil {
log.Printf("error reading broker response: %s", err.Error())
log.Printf("body: %s", body)
return nil
}
if offer != "" {
return deserializeSessionDescription(offer)
}
}
}

func (b *Broker) sendAnswer(sid string, pc *webrtc.PeerConnection) error {
brokerPath := b.url.ResolveReference(&url.URL{Path: "answer"})
func (r *Remote) sendAnswer(sid string, pc *webrtc.PeerConnection) error {
brokerPath := r.url.ResolveReference(&url.URL{Path: "answer"})
answer := string([]byte(serializeSessionDescription(pc.LocalDescription())))
body, err := messages.EncodeAnswerRequest(answer, sid)
if err != nil {
return err
}
req, _ := http.NewRequest("POST", brokerPath.String(), bytes.NewBuffer(body))
resp, err := b.transport.RoundTrip(req)
response, err := r.MakePost(brokerPath.String(), bytes.NewBuffer(body))
if err != nil {
return err
}
if resp.StatusCode != http.StatusOK {
return fmt.Errorf("broker returned %d", resp.StatusCode)
return fmt.Errorf("error sending answer to broker: %s", err.Error())
}

body, err = limitedRead(resp.Body, readLimit)
if err != nil {
return fmt.Errorf("error reading broker response: %s", err)
}
success, err := messages.DecodeAnswerResponse(body)
success, err := messages.DecodeAnswerResponse(response)
if err != nil {
return err
}
@@ -258,7 +258,7 @@ func CopyLoop(c1 io.ReadWriteCloser, c2 io.ReadWriteCloser) {
// conn.RemoteAddr() inside this function, as a workaround for a hang that
// otherwise occurs inside of conn.pc.RemoteDescription() (called by
// RemoteAddr). https://bugs.torproject.org/18628#comment:8
func datachannelHandler(conn *webRTCConn, remoteAddr net.Addr) {
func datachannelHandler(conn *webRTCConn) {
defer conn.Close()
defer retToken()

@@ -268,10 +268,10 @@ func datachannelHandler(conn *webRTCConn, remoteAddr net.Addr) {
}

// Retrieve client IP address
if remoteAddr != nil {
if conn.RemoteAddr() != nil {
// Encode client IP address in relay URL
q := u.Query()
clientIP := remoteAddr.String()
clientIP := conn.RemoteAddr().String()
q.Set("client_ip", clientIP)
u.RawQuery = q.Encode()
} else {
@@ -290,11 +290,24 @@ func datachannelHandler(conn *webRTCConn, remoteAddr net.Addr) {
log.Printf("datachannelHandler ends")
}

// Handler for the throughput test
func throughputHandler(conn *webRTCConn) {
defer conn.Close()

if _, err := io.Copy(conn, conn); err != nil {
log.Printf("io.Copy inside CopyLoop generated an error: %v", err)
}

}

// Create a PeerConnection from an SDP offer. Blocks until the gathering of ICE
// candidates is complete and the answer is available in LocalDescription.
// Installs an OnDataChannel callback that creates a webRTCConn and passes it to
// datachannelHandler.
func makePeerConnectionFromOffer(sdp *webrtc.SessionDescription, config webrtc.Configuration, dataChan chan struct{}) (*webrtc.PeerConnection, error) {
func makePeerConnectionFromOffer(sdp *webrtc.SessionDescription,
config webrtc.Configuration, dataChan chan struct{},
handler func(conn *webRTCConn)) (*webrtc.PeerConnection, error) {

pc, err := webrtc.NewPeerConnection(config)
if err != nil {
return nil, fmt.Errorf("accept: NewPeerConnection: %s", err)
@@ -330,7 +343,7 @@ func makePeerConnectionFromOffer(sdp *webrtc.SessionDescription, config webrtc.C
}
})

go datachannelHandler(conn, conn.RemoteAddr())
go handler(conn)
})

err = pc.SetRemoteDescription(*sdp)
@@ -373,7 +386,7 @@ func runSession(sid string) {
return
}
dataChan := make(chan struct{})
pc, err := makePeerConnectionFromOffer(offer, config, dataChan)
pc, err := makePeerConnectionFromOffer(offer, config, dataChan, datachannelHandler)
if err != nil {
log.Printf("error making WebRTC connection: %s", err)
retToken()
@@ -403,15 +416,76 @@ func runSession(sid string) {
}
}

func testThroughput(config webrtc.Configuration, probeURL string) {
var err error
var offer SnowflakeOffer
var result SnowflakeResult

sessionID := genSessionID()

probe := new(Remote)
probe.transport = http.DefaultTransport.(*http.Transport)
probe.url, err = url.Parse(probeURL)
if err != nil {
log.Printf("Error parsing url: %s", err.Error())
}
probePath := probe.url.ResolveReference(&url.URL{Path: "api/snowflake-poll"})

response, err := probe.MakePost(probePath.String(), bytes.NewBuffer(CreateSnowflakeRequest(sessionID)))
if err != nil {
log.Printf("Error connecting to probe point: %s", err.Error())
return
}
if err = json.Unmarshal(response, &offer); err != nil {
log.Printf("error reading bridgestrap response: %s", err.Error())
log.Printf("body: %s", response)
return
}

sdp := deserializeSessionDescription(offer.Offer)

// create answer
dataChan := make(chan struct{})
pc, err := makePeerConnectionFromOffer(sdp, config, dataChan, throughputHandler)
if err != nil {
log.Printf("error making WebRTC connection: %s", err)
retToken()
return
}

answer := pc.LocalDescription()

// send answer
testReq := CreateSnowflakeAnswer(sessionID, serializeSessionDescription(answer))
probePath = probe.url.ResolveReference(&url.URL{Path: "api/snowflake-test"})

response, err = probe.MakePost(probePath.String(), bytes.NewBuffer(testReq))
if err != nil {
log.Printf("Error connecting to probe point: %s", err.Error())
return
}
if err = json.Unmarshal(response, &result); err != nil {
log.Printf("error reading bridgestrap response: %s", err.Error())
log.Printf("body: %s", response)
return
}

log.Printf("Throughput: %f Kbps", result.Throughput)
log.Printf("Latency: %f s", result.Latency)

}

func main() {
var capacity uint
var stunURL string
var logFilename string
var rawBrokerURL string
var probeURL string

flag.UintVar(&capacity, "capacity", 10, "maximum concurrent clients")
flag.StringVar(&rawBrokerURL, "broker", defaultBrokerURL, "broker URL")
flag.StringVar(&relayURL, "relay", defaultRelayURL, "websocket relay URL")
flag.StringVar(&probeURL, "probe", defaultProbeURL, "URL for throughput testing probe")
flag.StringVar(&stunURL, "stun", defaultSTUNURL, "stun URL")
flag.StringVar(&logFilename, "log", "", "log filename")
flag.Parse()
@@ -432,7 +506,7 @@ func main() {
log.Println("starting")

var err error
broker = new(Broker)
broker = new(Remote)
broker.url, err = url.Parse(rawBrokerURL)
if err != nil {
log.Fatalf("invalid broker url: %s", err)
@@ -445,6 +519,10 @@ func main() {
if err != nil {
log.Fatalf("invalid relay url: %s", err)
}
_, err = url.Parse(probeURL)
if err != nil {
log.Fatalf("invalid probe url: %s", err)
}

broker.transport = http.DefaultTransport.(*http.Transport)
config = webrtc.Configuration{
@@ -459,6 +537,15 @@ func main() {
tokens <- true
}

//Perform a throughput test
testThroughput(config, probeURL)
go func() {
heartbeat := time.Tick(24 * time.Hour)
for range heartbeat {
testThroughput(config, probeURL)
}
}()

for {
getToken()
sessionID := genSessionID()
@@ -0,0 +1,43 @@
package main

import (
"encoding/json"
)

type SnowflakeRequest struct {
SnowflakeID string `json:"snowflake_id"`
}

type SnowflakeOffer struct {
Offer string `json:"offer"`
}

type SnowflakeAnswer struct {
SnowflakeID string `json:"snowflake_id"`
Answer string `json:"answer"`
}

type SnowflakeResult struct {
Throughput float64 `json:"throughput"`
Latency float64 `json:"latency"`
Error string `json:"error"`
}

func CreateSnowflakeRequest(id string) []byte {
request := &SnowflakeRequest{
SnowflakeID: id,
}
jsonRequest, _ := json.Marshal(request)

return jsonRequest
}

func CreateSnowflakeAnswer(id string, answer string) []byte {
request := &SnowflakeAnswer{
SnowflakeID: id,
Answer: answer,
}
jsonRequest, _ := json.Marshal(request)

return jsonRequest
}

No commit comments for this range