Skip to content

Commit

Permalink
webrtc: refactor interfaces and update example
Browse files Browse the repository at this point in the history
  • Loading branch information
Denys Smirnov committed Oct 6, 2018
1 parent 53c7549 commit 9a29137
Show file tree
Hide file tree
Showing 6 changed files with 391 additions and 255 deletions.
221 changes: 126 additions & 95 deletions examples/webrtc/main.go
Expand Up @@ -8,111 +8,63 @@ import (
"github.com/dennwc/dom/net/webrtc"
)

type discovery struct {
send chan<- []byte
recv <-chan []byte
}
func main() {
sig := NewChannel()

func (d *discovery) Broadcast(data []byte) (webrtc.Listener, error) {
d.send <- data
close(d.send)
return &discoveryLis{recv: d.recv}, nil
go Alice(sig)
Bob(sig)
}

type discoveryLis struct {
recv <-chan []byte
}
func Alice(sig webrtc.Signalling) {
const name = "alice"
p1 := webrtc.New(name, sig)

func (l *discoveryLis) Accept() ([]byte, error) {
data, ok := <-l.recv
if !ok {
return nil, io.EOF
fmt.Println(name + ": peer discovery started")
peers, err := p1.Discover()
if err != nil {
panic(err)
}
return data, nil
}

func (l *discoveryLis) Close() error {
return nil
}

type offerLis struct {
recv <-chan []byte
send chan<- []byte
}

func (l *offerLis) Answer(data []byte) error {
l.send <- data
close(l.send)
l.send = nil
return nil
}
defer peers.Close()

func (l *offerLis) Accept() ([]byte, error) {
data, ok := <-l.recv
if !ok {
return nil, io.EOF
fmt.Println(name + ": waiting for peers")
info, err := peers.Accept()
if err != nil {
panic(err)
}
return data, nil
}

func (l *offerLis) Close() error {
if l.send != nil {
close(l.send)
l.send = nil
pname := info.UID()
fmt.Printf(name+": dialing peer %q\n", pname)
conn, err := info.Dial()
if err != nil {
panic(err)
}
return nil
}

func main() {
ch1to2 := make(chan []byte, 1)
ch2to1 := make(chan []byte, 1)

go func() {
fmt.Println("1: peer discovery started")
peers, err := webrtc.Discover(&discovery{
send: ch1to2, recv: ch2to1,
})
if err != nil {
panic(err)
}
defer peers.Close()
defer conn.Close()

fmt.Println("1: waiting for peers")
info, err := peers.Accept()
if err != nil {
panic(err)
}
fmt.Println(name + ": connected!")
_, err = fmt.Fprintf(conn, "hello from %q\n", name)
if err != nil {
panic(err)
}
fmt.Println(name + ": sent data")

fmt.Println("1: dialing peer")
conn, err := info.Dial()
if err != nil {
buf := make([]byte, 128)
for {
n, err := conn.Read(buf)
if err == io.EOF {
break
} else if err != nil {
panic(err)
}
defer conn.Close()
fmt.Printf(name+": msg from %q: %q\n", pname, string(buf[:n]))
}
}

fmt.Println("1: connected!")
_, err = conn.Write([]byte("hello from 1\n"))
if err != nil {
panic(err)
}
fmt.Println("1: sent data")

buf := make([]byte, 128)
for {
n, err := conn.Read(buf)
if err == io.EOF {
break
} else if err != nil {
panic(err)
}
fmt.Println("1:", string(buf[:n]))
}
}()
func Bob(sig webrtc.Signalling) {
const name = "bob"

fmt.Println("2: waiting for offers")
peers, err := webrtc.Listen(&offerLis{
send: ch2to1, recv: ch1to2,
})
p2 := webrtc.New(name, sig)
fmt.Println(name + ": listening for offers")
peers, err := p2.Listen()
if err != nil {
panic(err)
}
Expand All @@ -123,26 +75,27 @@ func main() {
panic(err)
}

fmt.Println("2: dialing peer")
pname := info.UID()
fmt.Printf(name+": dialing peer %q\n", pname)
conn, err := info.Dial()
if err != nil {
panic(err)
}
defer conn.Close()

fmt.Println("2: connected!")
_, err = conn.Write([]byte("hello from 2\n"))
fmt.Println(name + ": connected!")
_, err = fmt.Fprintf(conn, "hello from %q\n", name)
if err != nil {
panic(err)
}
fmt.Println("2: sent data")
fmt.Println(name + ": sent data")

buf := make([]byte, 128)
n, err := conn.Read(buf)
if err != nil {
panic(err)
}
fmt.Println("2:", string(buf[:n]))
fmt.Printf(name+": msg from %q: %q\n", pname, string(buf[:n]))

for {
_, err = conn.Write([]byte(time.Now().String() + "\n"))
Expand All @@ -152,3 +105,81 @@ func main() {
time.Sleep(time.Second * 5)
}
}

// NewChannel creates a new signalling channel. It expects exactly one call to Broadcast and exactly one call to Listen.
func NewChannel() webrtc.Signalling {
return &signalChannel{
broadcast: make(chan webrtc.Signal, 1),
accept: make(chan webrtc.Signal, 1),
}
}

type signalChannel struct {
broadcast chan webrtc.Signal
accept chan webrtc.Signal
}

func (b *signalChannel) Broadcast(s webrtc.Signal) (webrtc.AnswerStream, error) {
b.broadcast <- s
close(b.broadcast)
return &answers{accept: b.accept}, nil
}

type answers struct {
accept <-chan webrtc.Signal
}

func (a *answers) Next() (webrtc.Signal, error) {
s, ok := <-a.accept
if !ok {
return webrtc.Signal{}, io.EOF
}
return s, nil
}

func (a *answers) Close() error {
ch := make(chan webrtc.Signal)
close(ch)
a.accept = ch
return nil
}

func (b *signalChannel) Listen(uid string) (webrtc.OfferStream, error) {
return &offers{broadcast: b.broadcast, accept: b.accept}, nil
}

type offers struct {
broadcast <-chan webrtc.Signal
accept chan<- webrtc.Signal
}

func (o *offers) Next() (webrtc.Offer, error) {
s, ok := <-o.broadcast
if !ok {
return nil, io.EOF
}
return &offer{accept: o.accept, s: s}, nil
}

func (o *offers) Close() error {
ch := make(chan webrtc.Signal)
close(ch)
o.broadcast = ch
return nil
}

type offer struct {
accept chan<- webrtc.Signal
s webrtc.Signal
}

func (o *offer) Answer(s webrtc.Signal) error {
o.accept <- s
close(o.accept)
o.accept = nil
return nil
}

func (o *offer) Info() webrtc.Signal {
return o.s
}
75 changes: 75 additions & 0 deletions net/webrtc/active.go
@@ -0,0 +1,75 @@
package webrtc

import (
"encoding/json"
"net"
)

// answerStream is an implementation of peer discovery when broadcasting (active).
type answerStream struct {
self string // user id
c *peerConnection

local connInfo // local WebRTC info
answers AnswerStream
}

func (p *answerStream) Close() error {
p.answers.Close()
if p.c != nil {
return p.c.Close()
}
return nil
}

func (p *answerStream) Accept() (Peer, error) {
// get the next answer, but don't use it yet
ans, err := p.answers.Next()
if err != nil {
return nil, err
}
var info connInfo
if err = json.Unmarshal(ans.Data, &info); err != nil {
return nil, err
}
return &peerAnswer{s: p, uid: ans.UID, info: info}, nil
}

type peerAnswer struct {
s *answerStream
uid string
info connInfo
}

func (p *peerAnswer) UID() string {
return p.uid
}

func (p *peerAnswer) Dial() (net.Conn, error) {
// if we are initiating a connection, we have just received an info from peer
// and we are ready to apply its configuration and start dialing
c := p.s.c

// switch to this peer and try to dial it
err := c.SetRemoteDescription(p.info.SDP)
if err != nil {
c.Close()
return nil, err
}

err = c.SetICECandidates(p.info.ICEs)
if err != nil {
c.Close()
return nil, err
}
// take ownership of the connection
p.s.c = nil

// now we should only wait for a state change to "connected"
// but instead we will wait for a data stream to come online
ch, err := c.WaitChannel(primaryChan)
if err != nil {
return nil, err
}
return ch, nil
}

0 comments on commit 9a29137

Please sign in to comment.