Skip to content

Commit

Permalink
feat: Create broker for negotiating connections (#14)
Browse files Browse the repository at this point in the history
* feat: Create broker for negotiating connections

WebRTC require an exchange of encryption keys and network hops to connect. This package pipes the exchange over gRPC. This will be used in all connecting clients and agents.

* Regenerate protobuf definition

* Cache Go build and test

* Fix gRPC language with dRPC

Co-authored-by: Bryan <bryan@coder.com>

Co-authored-by: Bryan <bryan@coder.com>
  • Loading branch information
kylecarbs and bryphe-coder committed Jan 11, 2022
1 parent 7c260f8 commit 53cfa8a
Show file tree
Hide file tree
Showing 14 changed files with 1,258 additions and 1 deletion.
1 change: 1 addition & 0 deletions .gitattributes
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
# Generated files
peerbroker/proto/*.go linguist-generated=true
provisionersdk/proto/*.go linguist-generated=true
12 changes: 12 additions & 0 deletions .github/workflows/coder.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,18 @@ jobs:
with:
go-version: "^1.17"

- uses: actions/cache@v2
with:
# Go mod cache, Linux build cache, Mac build cache, Windows build cache
path: |
~/go/pkg/mod
~/.cache/go-build
~/Library/Caches/go-build
%LocalAppData%\go-build
key: ${{ runner.os }}-go-${{ hashFiles('**/go.sum') }}
restore-keys: |
${{ runner.os }}-go-
- run: go install gotest.tools/gotestsum@latest

- run:
Expand Down
12 changes: 11 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,19 @@ endif
fmt: fmt/prettier
.PHONY: fmt

gen: database/generate provisionersdk/proto
gen: database/generate peerbroker/proto provisionersdk/proto
.PHONY: gen

# Generates the protocol files.
peerbroker/proto: peerbroker/proto/peerbroker.proto
cd peerbroker/proto && protoc \
--go_out=. \
--go_opt=paths=source_relative \
--go-drpc_out=. \
--go-drpc_opt=paths=source_relative \
./peerbroker.proto
.PHONY: peerbroker/proto

# Generates the protocol files.
provisionersdk/proto: provisionersdk/proto/provisioner.proto
cd provisionersdk/proto && protoc \
Expand Down
1 change: 1 addition & 0 deletions codecov.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,4 +21,5 @@ coverage:

ignore:
# This is generated code.
- peerbroker/proto
- provisionersdk/proto
14 changes: 14 additions & 0 deletions peer/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -337,6 +337,12 @@ func (c *Conn) LocalSessionDescription() <-chan webrtc.SessionDescription {
return c.localSessionDescriptionChannel
}

// SetConfiguration applies options to the WebRTC connection.
// Generally used for updating transport options, like ICE servers.
func (c *Conn) SetConfiguration(configuration webrtc.Configuration) error {
return c.rtc.SetConfiguration(configuration)
}

// SetRemoteSessionDescription sets the remote description for the WebRTC connection.
func (c *Conn) SetRemoteSessionDescription(s webrtc.SessionDescription) {
if c.isClosed() {
Expand Down Expand Up @@ -388,6 +394,9 @@ func (c *Conn) dialChannel(ctx context.Context, label string, opts *ChannelOpts)
if opts.OpenOnDisconnect && !opts.Negotiated {
return nil, xerrors.New("OpenOnDisconnect is only allowed for Negotiated channels")
}
if c.isClosed() {
return nil, xerrors.Errorf("closed: %w", c.closeError)
}

dc, err := c.rtc.CreateDataChannel(label, &webrtc.DataChannelInit{
ID: id,
Expand Down Expand Up @@ -446,6 +455,11 @@ func (c *Conn) Close() error {
return c.closeWithError(nil)
}

// CloseWithError closes the connection; subsequent reads/writes will return the error err.
func (c *Conn) CloseWithError(err error) error {
return c.closeWithError(err)
}

func (c *Conn) isClosed() bool {
select {
case <-c.closed:
Expand Down
10 changes: 10 additions & 0 deletions peer/conn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package peer_test

import (
"context"
"errors"
"io"
"net"
"net/http"
Expand Down Expand Up @@ -193,6 +194,15 @@ func TestConn(t *testing.T) {
require.NoError(t, err)
})

t.Run("CloseWithError", func(t *testing.T) {
conn, err := peer.Client([]webrtc.ICEServer{}, nil)
require.NoError(t, err)
expectedErr := errors.New("wow")
_ = conn.CloseWithError(expectedErr)
_, err = conn.Dial(context.Background(), "", nil)
require.ErrorIs(t, err, expectedErr)
})

t.Run("PingConcurrent", func(t *testing.T) {
t.Parallel()
client, server, _ := createPair(t)
Expand Down
113 changes: 113 additions & 0 deletions peerbroker/dial.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
package peerbroker

import (
"reflect"

"github.com/pion/webrtc/v3"
"golang.org/x/xerrors"

"github.com/coder/coder/peer"
"github.com/coder/coder/peerbroker/proto"
)

// Dial consumes the PeerBroker gRPC connection negotiation stream to produce a WebRTC peered connection.
func Dial(stream proto.DRPCPeerBroker_NegotiateConnectionClient, iceServers []webrtc.ICEServer, opts *peer.ConnOpts) (*peer.Conn, error) {
// Convert WebRTC ICE servers to the protobuf type.
protoIceServers := make([]*proto.WebRTCICEServer, 0, len(iceServers))
for _, iceServer := range iceServers {
var credentialString string
if value, ok := iceServer.Credential.(string); ok {
credentialString = value
}
protoIceServers = append(protoIceServers, &proto.WebRTCICEServer{
Urls: iceServer.URLs,
Username: iceServer.Username,
Credential: credentialString,
CredentialType: int32(iceServer.CredentialType),
})
}
if len(protoIceServers) > 0 {
// Send ICE servers to connect with.
// Client sends ICE servers so clients can determine the node
// servers will meet at. eg. us-west1.coder.com could be a TURN server.
err := stream.Send(&proto.NegotiateConnection_ClientToServer{
Message: &proto.NegotiateConnection_ClientToServer_Servers{
Servers: &proto.WebRTCICEServers{
Servers: protoIceServers,
},
},
})
if err != nil {
return nil, xerrors.Errorf("write ice servers: %w", err)
}
}

peerConn, err := peer.Client(iceServers, opts)
if err != nil {
return nil, xerrors.Errorf("create peer connection: %w", err)
}
go func() {
defer stream.Close()
// Exchanging messages from the peer connection to negotiate a connection.
for {
select {
case <-peerConn.Closed():
return
case sessionDescription := <-peerConn.LocalSessionDescription():
err = stream.Send(&proto.NegotiateConnection_ClientToServer{
Message: &proto.NegotiateConnection_ClientToServer_Offer{
Offer: &proto.WebRTCSessionDescription{
SdpType: int32(sessionDescription.Type),
Sdp: sessionDescription.SDP,
},
},
})
if err != nil {
_ = peerConn.CloseWithError(xerrors.Errorf("send local session description: %w", err))
return
}
case iceCandidate := <-peerConn.LocalCandidate():
err = stream.Send(&proto.NegotiateConnection_ClientToServer{
Message: &proto.NegotiateConnection_ClientToServer_IceCandidate{
IceCandidate: iceCandidate.Candidate,
},
})
if err != nil {
_ = peerConn.CloseWithError(xerrors.Errorf("send local candidate: %w", err))
return
}
}
}
}()
go func() {
// Exchanging messages from the server to negotiate a connection.
for {
serverToClientMessage, err := stream.Recv()
if err != nil {
_ = peerConn.CloseWithError(err)
return
}

switch {
case serverToClientMessage.GetAnswer() != nil:
peerConn.SetRemoteSessionDescription(webrtc.SessionDescription{
Type: webrtc.SDPType(serverToClientMessage.GetAnswer().SdpType),
SDP: serverToClientMessage.GetAnswer().Sdp,
})
case serverToClientMessage.GetIceCandidate() != "":
err = peerConn.AddRemoteCandidate(webrtc.ICECandidateInit{
Candidate: serverToClientMessage.GetIceCandidate(),
})
if err != nil {
_ = peerConn.CloseWithError(xerrors.Errorf("add remote candidate: %w", err))
return
}
default:
_ = peerConn.CloseWithError(xerrors.Errorf("unhandled message: %s", reflect.TypeOf(serverToClientMessage).String()))
return
}
}
}()

return peerConn, nil
}
48 changes: 48 additions & 0 deletions peerbroker/dial_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package peerbroker_test

import (
"context"
"testing"

"github.com/coder/coder/peerbroker"
"github.com/coder/coder/peerbroker/proto"
"github.com/coder/coder/provisionersdk"
"github.com/pion/webrtc/v3"
"github.com/stretchr/testify/require"
"go.uber.org/goleak"
"storj.io/drpc/drpcconn"
)

func TestMain(m *testing.M) {
goleak.VerifyTestMain(m)
}

func TestDial(t *testing.T) {
t.Run("Connect", func(t *testing.T) {
ctx := context.Background()
client, server := provisionersdk.TransportPipe()
defer client.Close()
defer server.Close()

listener, err := peerbroker.Listen(server, nil)
require.NoError(t, err)

api := proto.NewDRPCPeerBrokerClient(drpcconn.New(client))
stream, err := api.NegotiateConnection(ctx)
require.NoError(t, err)
clientConn, err := peerbroker.Dial(stream, []webrtc.ICEServer{{
URLs: []string{"stun:stun.l.google.com:19302"},
}}, nil)
require.NoError(t, err)
defer clientConn.Close()

serverConn, err := listener.Accept()
require.NoError(t, err)
defer serverConn.Close()
_, err = serverConn.Ping()
require.NoError(t, err)

_, err = clientConn.Ping()
require.NoError(t, err)
})
}

0 comments on commit 53cfa8a

Please sign in to comment.