Skip to content

Commit

Permalink
Merge pull request #106 from Azure/websocket
Browse files Browse the repository at this point in the history
Add websocket support
  • Loading branch information
devigned committed Apr 25, 2019
2 parents 76a6857 + ce512d0 commit 8f96175
Show file tree
Hide file tree
Showing 6 changed files with 113 additions and 3 deletions.
3 changes: 3 additions & 0 deletions changelog.md
@@ -1,5 +1,8 @@
# Change Log

## `v1.2.0`
- add websocket support

## `v1.1.5`
- add sender recovery handling for `amqp.ErrLinkClose`, `amqp.ErrConnClosed` and `amqp.ErrSessionClosed`

Expand Down
1 change: 1 addition & 0 deletions go.mod
Expand Up @@ -14,5 +14,6 @@ require (
github.com/sirupsen/logrus v1.1.1
github.com/stretchr/testify v1.2.2
go.opencensus.io v0.18.0
golang.org/x/net v0.0.0-20181023162649-9b4f9f5ad519
pack.ag/amqp v0.11.0
)
10 changes: 9 additions & 1 deletion hub.go
Expand Up @@ -53,7 +53,7 @@ const (
rootUserAgent = "/golang-event-hubs"

// Version is the semantic version number
Version = "1.1.5"
Version = "1.2.0"
)

type (
Expand Down Expand Up @@ -683,6 +683,14 @@ func HubWithEnvironment(env azure.Environment) HubOption {
}
}

// HubWithWebSocketConnection configures the Hub to use a WebSocket connection wss:// rather than amqps://
func HubWithWebSocketConnection() HubOption {
return func(h *Hub) error {
h.namespace.useWebSocket = true
return nil
}
}

func (h *Hub) appendAgent(userAgent string) error {
ua := path.Join(h.userAgent, userAgent)
if len(ua) > maxUserAgentLen {
Expand Down
57 changes: 57 additions & 0 deletions hub_examples_test.go
Expand Up @@ -74,6 +74,63 @@ func ExampleHub_helloWorld() {
// Output: Hello World!
}

func ExampleHub_webSocket() {
ctx, cancel := context.WithTimeout(context.Background(), 40*time.Second)
defer cancel()

connStr := os.Getenv("EVENTHUB_CONNECTION_STRING")
if connStr == "" {
fmt.Println("FATAL: expected environment variable EVENTHUB_CONNECTION_STRING not set")
return
}

hubManager, err := eventhub.NewHubManagerFromConnectionString(connStr)
if err != nil {
fmt.Println(err)
return
}

hubEntity, err := ensureHub(ctx, hubManager, "ExampleHub_helloWorld")
if err != nil {
fmt.Println(err)
return
}

// Create a client to communicate with EventHub
hub, err := eventhub.NewHubFromConnectionString(connStr + ";EntityPath=" + hubEntity.Name, eventhub.HubWithWebSocketConnection())
if err != nil {
fmt.Println(err)
return
}

err = hub.Send(ctx, eventhub.NewEventFromString("this message was sent and received via web socket!!"))
if err != nil {
fmt.Println(err)
return
}

exit := make(chan struct{})
handler := func(ctx context.Context, event *eventhub.Event) error {
text := string(event.Data)
fmt.Println(text)
exit <- struct{}{}
return nil
}

for _, partitionID := range *hubEntity.PartitionIDs {
_, err = hub.Receive(ctx, partitionID, handler)
}

// wait for the first handler to get called with "Hello World!"
<-exit
err = hub.Close(ctx)
if err != nil {
fmt.Println(err)
return
}
// Output: this message was sent and received via web socket!!
}

func ensureHub(ctx context.Context, em *eventhub.HubManager, name string, opts ...eventhub.HubManagementOption) (*eventhub.HubEntity, error) {
he, err := em.Get(ctx, name)
if err == nil {
Expand Down
25 changes: 25 additions & 0 deletions hub_test.go
Expand Up @@ -279,6 +279,31 @@ func (suite *eventHubSuite) TestPartitioned() {
}
}

func (suite *eventHubSuite) TestWebSocket() {
tests := map[string]func(context.Context, *testing.T, *Hub, string){
"TestSend": testBasicSend,
"TestSendTooBig": testSendTooBig,
"TestSendAndReceive": testBasicSendAndReceive,
"TestBatchSendAndReceive": testBatchSendAndReceive,
"TestBatchSendTooLarge": testBatchSendTooLarge,
}

for name, testFunc := range tests {
setupTestTeardown := func(t *testing.T) {
hub, cleanup := suite.RandomHub()
defer cleanup()
partitionID := (*hub.PartitionIds)[0]
client, closer := suite.newClient(t, *hub.Name, HubWithPartitionedSender(partitionID), HubWithWebSocketConnection())
defer closer()
ctx, cancel := context.WithTimeout(context.Background(), defaultTimeout)
defer cancel()
testFunc(ctx, t, client, partitionID)
}

suite.T().Run(name, setupTestTeardown)
}
}

func testBasicSend(ctx context.Context, t *testing.T, client *Hub, _ string) {
err := client.Send(ctx, NewEventFromString("Hello!"))
assert.NoError(t, err)
Expand Down
20 changes: 18 additions & 2 deletions namespace.go
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/Azure/azure-amqp-common-go/conn"
"github.com/Azure/azure-amqp-common-go/sas"
"github.com/Azure/go-autorest/autorest/azure"
"golang.org/x/net/websocket"
"pack.ag/amqp"
)

Expand All @@ -40,6 +41,7 @@ type (
name string
tokenProvider auth.TokenProvider
host string
useWebSocket bool
}

// namespaceOption provides structure for configuring a new Event Hub namespace
Expand Down Expand Up @@ -89,14 +91,28 @@ func newNamespace(opts ...namespaceOption) (*namespace, error) {

func (ns *namespace) newConnection() (*amqp.Client, error) {
host := ns.getAmqpsHostURI()
return amqp.Dial(host,

defaultConnOptions := []amqp.ConnOption{
amqp.ConnSASLAnonymous(),
amqp.ConnProperty("product", "MSGolangClient"),
amqp.ConnProperty("version", Version),
amqp.ConnProperty("platform", runtime.GOOS),
amqp.ConnProperty("framework", runtime.Version()),
amqp.ConnProperty("user-agent", rootUserAgent),
)
}

if ns.useWebSocket {
trimmedHost := strings.TrimPrefix(ns.host, "amqps://")
wssConn, err := websocket.Dial("wss://"+trimmedHost+"/$servicebus/websocket", "amqp", "http://localhost/")
if err != nil {
return nil, err
}

wssConn.PayloadType = websocket.BinaryFrame
return amqp.New(wssConn, append(defaultConnOptions, amqp.ConnServerHostname(trimmedHost))...)
}

return amqp.Dial(host, defaultConnOptions...)
}

func (ns *namespace) negotiateClaim(ctx context.Context, conn *amqp.Client, entityPath string) error {
Expand Down

0 comments on commit 8f96175

Please sign in to comment.