Permalink
Cannot retrieve contributors at this time
375 lines (351 sloc)
9.9 KB
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode characters
| package lib | |
| import ( | |
| "bytes" | |
| "errors" | |
| "io" | |
| "log" | |
| "sync" | |
| "time" | |
| "github.com/dchest/uniuri" | |
| "github.com/pion/logging" | |
| "github.com/pion/webrtc" | |
| ) | |
| // Remote WebRTC peer. | |
| // Implements the |Snowflake| interface, which includes | |
| // |io.ReadWriter|, |Resetter|, and |Connector|. | |
| // | |
| // Handles preparation of go-webrtc PeerConnection. Only ever has | |
| // one DataChannel. | |
| type WebRTCPeer struct { | |
| id string | |
| config *webrtc.Configuration | |
| pc *webrtc.PeerConnection | |
| transport SnowflakeDataChannel // Holds the WebRTC DataChannel. | |
| broker *BrokerChannel | |
| offerChannel chan *webrtc.SessionDescription | |
| answerChannel chan *webrtc.SessionDescription | |
| errorChannel chan error | |
| recvPipe *io.PipeReader | |
| writePipe *io.PipeWriter | |
| lastReceive time.Time | |
| buffer bytes.Buffer | |
| reset chan struct{} | |
| closed bool | |
| lock sync.Mutex // Synchronization for DataChannel destruction | |
| once sync.Once // Synchronization for PeerConnection destruction | |
| BytesLogger | |
| } | |
| // Construct a WebRTC PeerConnection. | |
| func NewWebRTCPeer(config *webrtc.Configuration, | |
| broker *BrokerChannel) *WebRTCPeer { | |
| connection := new(WebRTCPeer) | |
| connection.id = "snowflake-" + uniuri.New() | |
| connection.config = config | |
| connection.broker = broker | |
| connection.offerChannel = make(chan *webrtc.SessionDescription, 1) | |
| connection.answerChannel = make(chan *webrtc.SessionDescription, 1) | |
| // Error channel is mostly for reporting during the initial SDP offer | |
| // creation & local description setting, which happens asynchronously. | |
| connection.errorChannel = make(chan error, 1) | |
| connection.reset = make(chan struct{}, 1) | |
| // Override with something that's not NullLogger to have real logging. | |
| connection.BytesLogger = &BytesNullLogger{} | |
| // Pipes remain the same even when DataChannel gets switched. | |
| connection.recvPipe, connection.writePipe = io.Pipe() | |
| return connection | |
| } | |
| // Read bytes from local SOCKS. | |
| // As part of |io.ReadWriter| | |
| func (c *WebRTCPeer) Read(b []byte) (int, error) { | |
| return c.recvPipe.Read(b) | |
| } | |
| // Writes bytes out to remote WebRTC. | |
| // As part of |io.ReadWriter| | |
| func (c *WebRTCPeer) Write(b []byte) (int, error) { | |
| c.lock.Lock() | |
| defer c.lock.Unlock() | |
| c.BytesLogger.AddOutbound(len(b)) | |
| // TODO: Buffering could be improved / separated out of WebRTCPeer. | |
| if nil == c.transport { | |
| log.Printf("Buffered %d bytes --> WebRTC", len(b)) | |
| c.buffer.Write(b) | |
| } else { | |
| c.transport.Send(b) | |
| } | |
| return len(b), nil | |
| } | |
| // As part of |Snowflake| | |
| func (c *WebRTCPeer) Close() error { | |
| c.once.Do(func() { | |
| c.closed = true | |
| c.cleanup() | |
| c.Reset() | |
| log.Printf("WebRTC: Closing") | |
| }) | |
| return nil | |
| } | |
| // As part of |Resetter| | |
| func (c *WebRTCPeer) Reset() { | |
| if nil == c.reset { | |
| return | |
| } | |
| c.reset <- struct{}{} | |
| } | |
| // As part of |Resetter| | |
| func (c *WebRTCPeer) WaitForReset() { <-c.reset } | |
| // Prevent long-lived broken remotes. | |
| // Should also update the DataChannel in underlying go-webrtc's to make Closes | |
| // more immediate / responsive. | |
| func (c *WebRTCPeer) checkForStaleness() { | |
| c.lastReceive = time.Now() | |
| for { | |
| if c.closed { | |
| return | |
| } | |
| if time.Since(c.lastReceive).Seconds() > SnowflakeTimeout { | |
| log.Println("WebRTC: No messages received for", SnowflakeTimeout, | |
| "seconds -- closing stale connection.") | |
| c.Close() | |
| return | |
| } | |
| <-time.After(time.Second) | |
| } | |
| } | |
| // As part of |Connector| interface. | |
| func (c *WebRTCPeer) Connect() error { | |
| log.Println(c.id, " connecting...") | |
| // TODO: When go-webrtc is more stable, it's possible that a new | |
| // PeerConnection won't need to be re-prepared each time. | |
| err := c.preparePeerConnection() | |
| if err != nil { | |
| return err | |
| } | |
| err = c.establishDataChannel() | |
| if err != nil { | |
| return errors.New("WebRTC: Could not establish DataChannel.") | |
| } | |
| err = c.exchangeSDP() | |
| if err != nil { | |
| return err | |
| } | |
| go c.checkForStaleness() | |
| return nil | |
| } | |
| // Create and prepare callbacks on a new WebRTC PeerConnection. | |
| func (c *WebRTCPeer) preparePeerConnection() error { | |
| if nil != c.pc { | |
| c.pc.Close() | |
| c.pc = nil | |
| } | |
| logFactory := logging.NewDefaultLoggerFactory() | |
| logFactory.DefaultLogLevel = logging.LogLevelError | |
| logFactory.Writer = log.Writer() | |
| s := webrtc.SettingEngine{ | |
| LoggerFactory: logFactory, | |
| } | |
| s.SetTrickle(true) | |
| api := webrtc.NewAPI(webrtc.WithSettingEngine(s)) | |
| pc, err := api.NewPeerConnection(*c.config) | |
| if err != nil { | |
| log.Printf("NewPeerConnection ERROR: %s", err) | |
| return err | |
| } | |
| // Prepare PeerConnection callbacks. | |
| // Allow candidates to accumulate until ICEGatheringStateComplete. | |
| pc.OnICECandidate(func(candidate *webrtc.ICECandidate) { | |
| if candidate == nil { | |
| log.Printf("WebRTC: Done gathering candidates") | |
| } else { | |
| log.Printf("WebRTC: Got ICE candidate: %s", candidate.String()) | |
| } | |
| }) | |
| pc.OnICEGatheringStateChange(func(state webrtc.ICEGathererState) { | |
| if state == webrtc.ICEGathererStateComplete { | |
| log.Println("WebRTC: ICEGatheringStateComplete") | |
| c.offerChannel <- pc.LocalDescription() | |
| } | |
| }) | |
| // This callback is not expected, as the Client initiates the creation | |
| // of the data channel, not the remote peer. | |
| pc.OnDataChannel(func(channel *webrtc.DataChannel) { | |
| log.Println("OnDataChannel") | |
| panic("Unexpected OnDataChannel!") | |
| }) | |
| c.pc = pc | |
| go func() { | |
| offer, err := pc.CreateOffer(nil) | |
| // TODO: Potentially timeout and retry if ICE isn't working. | |
| if err != nil { | |
| c.errorChannel <- err | |
| return | |
| } | |
| log.Println("WebRTC: Created offer") | |
| err = pc.SetLocalDescription(offer) | |
| if err != nil { | |
| c.errorChannel <- err | |
| return | |
| } | |
| log.Println("WebRTC: Set local description") | |
| }() | |
| log.Println("WebRTC: PeerConnection created.") | |
| return nil | |
| } | |
| // Create a WebRTC DataChannel locally. | |
| func (c *WebRTCPeer) establishDataChannel() error { | |
| c.lock.Lock() | |
| defer c.lock.Unlock() | |
| if c.transport != nil { | |
| panic("Unexpected datachannel already exists!") | |
| } | |
| ordered := true | |
| dataChannelOptions := &webrtc.DataChannelInit{ | |
| Ordered: &ordered, | |
| } | |
| dc, err := c.pc.CreateDataChannel(c.id, dataChannelOptions) | |
| // Triggers "OnNegotiationNeeded" on the PeerConnection, which will prepare | |
| // an SDP offer while other goroutines operating on this struct handle the | |
| // signaling. Eventually fires "OnOpen". | |
| if err != nil { | |
| log.Printf("CreateDataChannel ERROR: %s", err) | |
| return err | |
| } | |
| dc.OnOpen(func() { | |
| c.lock.Lock() | |
| defer c.lock.Unlock() | |
| log.Println("WebRTC: DataChannel.OnOpen") | |
| if nil != c.transport { | |
| panic("WebRTC: transport already exists.") | |
| } | |
| // Flush buffered outgoing SOCKS data if necessary. | |
| if c.buffer.Len() > 0 { | |
| dc.Send(c.buffer.Bytes()) | |
| log.Println("Flushed", c.buffer.Len(), "bytes.") | |
| c.buffer.Reset() | |
| } | |
| // Then enable the datachannel. | |
| c.transport = dc | |
| }) | |
| dc.OnClose(func() { | |
| c.lock.Lock() | |
| // Future writes will go to the buffer until a new DataChannel is available. | |
| if nil == c.transport { | |
| // Closed locally, as part of a reset. | |
| log.Println("WebRTC: DataChannel.OnClose [locally]") | |
| c.lock.Unlock() | |
| return | |
| } | |
| // Closed remotely, need to reset everything. | |
| // Disable the DataChannel as a write destination. | |
| log.Println("WebRTC: DataChannel.OnClose [remotely]") | |
| c.transport = nil | |
| dc.Close() | |
| // Unlock before Close'ing, since it calls cleanup and asks for the | |
| // lock to check if the transport needs to be be deleted. | |
| c.lock.Unlock() | |
| c.Close() | |
| }) | |
| dc.OnMessage(func(msg webrtc.DataChannelMessage) { | |
| if len(msg.Data) <= 0 { | |
| log.Println("0 length message---") | |
| } | |
| c.BytesLogger.AddInbound(len(msg.Data)) | |
| n, err := c.writePipe.Write(msg.Data) | |
| if err != nil { | |
| // TODO: Maybe shouldn't actually close. | |
| log.Println("Error writing to SOCKS pipe") | |
| c.writePipe.CloseWithError(err) | |
| } | |
| if n != len(msg.Data) { | |
| log.Println("Error: short write") | |
| panic("short write") | |
| } | |
| c.lastReceive = time.Now() | |
| }) | |
| log.Println("WebRTC: DataChannel created.") | |
| return nil | |
| } | |
| func (c *WebRTCPeer) sendOfferToBroker() { | |
| if nil == c.broker { | |
| return | |
| } | |
| offer := c.pc.LocalDescription() | |
| answer, err := c.broker.Negotiate(offer) | |
| if nil != err || nil == answer { | |
| log.Printf("BrokerChannel Error: %s", err) | |
| answer = nil | |
| } | |
| c.answerChannel <- answer | |
| } | |
| // Block until an SDP offer is available, send it to either | |
| // the Broker or signal pipe, then await for the SDP answer. | |
| func (c *WebRTCPeer) exchangeSDP() error { | |
| select { | |
| case <-c.offerChannel: | |
| case err := <-c.errorChannel: | |
| log.Println("Failed to prepare offer", err) | |
| c.Close() | |
| return err | |
| } | |
| // Keep trying the same offer until a valid answer arrives. | |
| var ok bool | |
| var answer *webrtc.SessionDescription = nil | |
| for nil == answer { | |
| go c.sendOfferToBroker() | |
| answer, ok = <-c.answerChannel // Blocks... | |
| if !ok || nil == answer { | |
| log.Printf("Failed to retrieve answer. Retrying in %d seconds", ReconnectTimeout) | |
| <-time.After(time.Second * ReconnectTimeout) | |
| answer = nil | |
| } | |
| } | |
| log.Printf("Received Answer.\n") | |
| err := c.pc.SetRemoteDescription(*answer) | |
| if nil != err { | |
| log.Println("WebRTC: Unable to SetRemoteDescription:", err) | |
| return err | |
| } | |
| return nil | |
| } | |
| // Close all channels and transports | |
| func (c *WebRTCPeer) cleanup() { | |
| if nil != c.offerChannel { | |
| close(c.offerChannel) | |
| } | |
| if nil != c.answerChannel { | |
| close(c.answerChannel) | |
| } | |
| if nil != c.errorChannel { | |
| close(c.errorChannel) | |
| } | |
| // Close this side of the SOCKS pipe. | |
| if nil != c.writePipe { | |
| c.writePipe.Close() | |
| c.writePipe = nil | |
| } | |
| c.lock.Lock() | |
| if nil != c.transport { | |
| log.Printf("WebRTC: closing DataChannel") | |
| dataChannel := c.transport | |
| // Setting transport to nil *before* dc Close indicates to OnClose that | |
| // this was locally triggered. | |
| c.transport = nil | |
| // Release the lock before calling DeleteDataChannel (which in turn | |
| // calls Close on the dataChannel), but after nil'ing out the transport, | |
| // since otherwise we'll end up in the onClose handler in a deadlock. | |
| c.lock.Unlock() | |
| if c.pc == nil { | |
| panic("DataChannel w/o PeerConnection, not good.") | |
| } | |
| dataChannel.(*webrtc.DataChannel).Close() | |
| } else { | |
| c.lock.Unlock() | |
| } | |
| if nil != c.pc { | |
| log.Printf("WebRTC: closing PeerConnection") | |
| err := c.pc.Close() | |
| if nil != err { | |
| log.Printf("Error closing peerconnection...") | |
| } | |
| c.pc = nil | |
| } | |
| } |