Skip to content

Commit

Permalink
add Event Hubs samples
Browse files Browse the repository at this point in the history
demonstrates:
* management of namespaces and hubs
* send, receive from one partition, and receive with EPH
  • Loading branch information
joshgav committed Jul 12, 2018
1 parent 675d809 commit 891ca18
Show file tree
Hide file tree
Showing 9 changed files with 514 additions and 13 deletions.
104 changes: 97 additions & 7 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

19 changes: 13 additions & 6 deletions Gopkg.toml
@@ -1,22 +1,29 @@
[[constraint]]
# change to [[constraint]] once merged to master
[[override]]
name = "github.com/Azure/azure-sdk-for-go"
branch = "master"
version = ">=18.0.0"

[[constraint]]
# change to [[constraint]] once merged to master
[[override]]
name = "github.com/Azure/azure-storage-blob-go"
branch = "master"
version = "~0.1.4"

# change to [[constraint]] once merged to master
[[override]]
name = "github.com/Azure/go-autorest"
version = "^10.12.0"
version = ">=10.12.0"

[[constraint]]
name = "github.com/Azure/azure-event-hubs-go"
branch = "master"
name = "github.com/denisenkom/go-mssqldb"

[[constraint]]
name = "github.com/denisenkom/go-mssqldb"
branch = "master"

[[constraint]]
name = "github.com/globalsign/mgo"
branch = "master"

[[constraint]]
name = "github.com/satori/go.uuid"
Expand Down
15 changes: 15 additions & 0 deletions eventhubs/README.md
@@ -0,0 +1,15 @@
# Azure Event Hubs

This directory contains samples for managing and using [Azure Event Hubs][1].
The following functionality is demonstrated:

* Namespace creation in [./namespace.go](./namespace.go)
* Hub creation in [./hub.go](./hub.go)
* Sending events in [./send_events.go](./send_events.go)
* Receiving events from a designated partition in [./receive_events.go](./receive_events.go).
* Receiving events with EventProcessorHost in [./receive_eph.go](./receive_eph.go)

You can run the tests in this repo by creating a `.env` file as described in
the root README, and invoking `go test -v .` from this directory.

[1]: https://docs.microsoft.com/en-us/azure/event-hubs/
87 changes: 87 additions & 0 deletions eventhubs/eventhubs_test.go
@@ -0,0 +1,87 @@
// Copyright (c) Microsoft and contributors. All rights reserved.
//
// This source code is licensed under the MIT license found in the
// LICENSE file in the root directory of this source tree.

package eventhubs

import (
"context"
"log"
"os"
"testing"
"time"

"github.com/Azure-Samples/azure-sdk-for-go-samples/helpers"
"github.com/Azure-Samples/azure-sdk-for-go-samples/iam"
"github.com/Azure-Samples/azure-sdk-for-go-samples/resources"
)

const (
location = "westus2"
nsName = "ehtest-03-ns"
hubName = "ehtest-03-hub"

// for storage.LeaserCheckpointer
storageAccountName = "ehtest0001storage"
storageContainerName = "eventhubs0001leasercheckpointer"
)

func TestMain(m *testing.M) {
err := helpers.ParseArgs()
if err != nil {
log.Fatalf("failed to parse args: %v\n", err)
}

err = iam.ParseArgs()
if err != nil {
log.Fatalln("failed to parse IAM args")
}

os.Exit(m.Run())
}

func ExampleEventHubs() {
ctx, cancel := context.WithTimeout(context.Background(), 300*time.Second)
// defer goes in LIFO order
defer cancel()
defer resources.Cleanup(context.Background()) // cleanup can take a long time

// create group
var err error
helpers.SetResourceGroupName("eventhubstest")
_, err = resources.CreateGroup(ctx, helpers.ResourceGroupName())
if err != nil {
helpers.PrintAndLog(err.Error())
}
helpers.PrintAndLog("created group")

// create Event Hubs namespace
_, err = CreateNamespace(ctx, nsName)
if err != nil {
helpers.PrintAndLog(err.Error())
}
helpers.PrintAndLog("created namespace")

// create Event Hubs hub
_, err = CreateHub(ctx, nsName, hubName)
if err != nil {
helpers.PrintAndLog(err.Error())
}
helpers.PrintAndLog("created hub")

// send and receive messages
log.Printf("Send(ctx)\n")
Send(ctx, nsName, hubName)
log.Printf("Receive(ctx)\n")
Receive(ctx, nsName, hubName)
log.Printf("ReceiveViaEPH(ctx)\n")
ReceiveViaEPH(ctx, nsName, hubName, storageAccountName, storageContainerName)

// Output:
// created group
// created namespace
// created hub
// received: test-message
// received: test-message
}
39 changes: 39 additions & 0 deletions eventhubs/hub.go
@@ -0,0 +1,39 @@
// Copyright (c) Microsoft and contributors. All rights reserved.
//
// This source code is licensed under the MIT license found in the
// LICENSE file in the root directory of this source tree.

package eventhubs

import (
"context"

"github.com/Azure-Samples/azure-sdk-for-go-samples/helpers"
"github.com/Azure-Samples/azure-sdk-for-go-samples/iam"
"github.com/Azure/azure-sdk-for-go/services/eventhub/mgmt/2017-04-01/eventhub"
"github.com/Azure/go-autorest/autorest/to"
)

func getHubsClient() eventhub.EventHubsClient {
hubClient := eventhub.NewEventHubsClient(helpers.SubscriptionID())
auth, _ := iam.GetResourceManagementAuthorizer(iam.AuthGrantType())
hubClient.Authorizer = auth
hubClient.AddToUserAgent(helpers.UserAgent())
return hubClient
}

// CreateHub creates an Event Hubs hub in a namespace
func CreateHub(ctx context.Context, nsName string, hubName string) (eventhub.Model, error) {
hubClient := getHubsClient()
return hubClient.CreateOrUpdate(
ctx,
helpers.ResourceGroupName(),
nsName,
hubName,
eventhub.Model{
Properties: &eventhub.Properties{
PartitionCount: to.Int64Ptr(4),
},
},
)
}
47 changes: 47 additions & 0 deletions eventhubs/namespace.go
@@ -0,0 +1,47 @@
// Copyright (c) Microsoft and contributors. All rights reserved.
//
// This source code is licensed under the MIT license found in the
// LICENSE file in the root directory of this source tree.

package eventhubs

import (
"context"

"github.com/Azure-Samples/azure-sdk-for-go-samples/helpers"
"github.com/Azure-Samples/azure-sdk-for-go-samples/iam"
"github.com/Azure/azure-sdk-for-go/services/eventhub/mgmt/2017-04-01/eventhub"
"github.com/Azure/go-autorest/autorest/to"
)

func getNamespacesClient() eventhub.NamespacesClient {
nsClient := eventhub.NewNamespacesClient(helpers.SubscriptionID())
auth, _ := iam.GetResourceManagementAuthorizer(iam.AuthGrantType())
nsClient.Authorizer = auth
nsClient.AddToUserAgent(helpers.UserAgent())
return nsClient
}

// CreateNamespace creates an Event Hubs namespace
func CreateNamespace(ctx context.Context, nsName string) (*eventhub.EHNamespace, error) {
nsClient := getNamespacesClient()
future, err := nsClient.CreateOrUpdate(
ctx,
helpers.ResourceGroupName(),
nsName,
eventhub.EHNamespace{
Location: to.StringPtr(helpers.Location()),
},
)
if err != nil {
return nil, err
}

err = future.WaitForCompletion(ctx, nsClient.Client)
if err != nil {
return nil, err
}

result, err := future.Result(nsClient)
return &result, err
}

0 comments on commit 891ca18

Please sign in to comment.