forked from Azure/azure-event-hubs-go
/
main.go
113 lines (98 loc) · 2.57 KB
/
main.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
package main
import (
"context"
"fmt"
"log"
"os"
"time"
"github.com/Azure/azure-event-hubs-go"
"github.com/Azure/azure-amqp-common-go/aad"
mgmt "github.com/Azure/azure-sdk-for-go/services/eventhub/mgmt/2017-04-01/eventhub"
"github.com/Azure/go-autorest/autorest/azure"
azauth "github.com/Azure/go-autorest/autorest/azure/auth"
)
const (
Location = "eastus"
ResourceGroupName = "ehtest"
HubName = "producerConsumer"
)
func main() {
hub, partitions := initHub()
exit := make(chan struct{})
handler := func(ctx context.Context, event *eventhub.Event) error {
text := string(event.Data)
if text == "exit\n" {
fmt.Println("Oh snap!! Someone told me to exit!")
exit <- *new(struct{})
} else {
fmt.Println(string(event.Data))
}
return nil
}
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
for _, partitionID := range partitions {
hub.Receive(ctx, partitionID, handler, eventhub.ReceiveWithLatestOffset())
}
cancel()
fmt.Println("I am listening...")
select {
case <-exit:
fmt.Println("closing after 2 seconds")
select {
case <-time.After(2 * time.Second):
return
}
}
}
func initHub() (*eventhub.Hub, []string) {
namespace := mustGetenv("EVENTHUB_NAMESPACE")
hubMgmt, err := ensureEventHub(context.Background(), HubName)
if err != nil {
log.Fatal(err)
}
provider, err := aad.NewJWTProvider(aad.JWTProviderWithEnvironmentVars())
if err != nil {
log.Fatal(err)
}
hub, err := eventhub.NewHub(namespace, HubName, provider)
if err != nil {
panic(err)
}
return hub, *hubMgmt.PartitionIds
}
func mustGetenv(key string) string {
v := os.Getenv(key)
if v == "" {
panic("Environment variable '" + key + "' required for integration tests.")
}
return v
}
func ensureEventHub(ctx context.Context, name string) (*mgmt.Model, error) {
namespace := mustGetenv("EVENTHUB_NAMESPACE")
client := getEventHubMgmtClient()
hub, err := client.Get(ctx, ResourceGroupName, namespace, name)
partitionCount := int64(4)
if err != nil {
newHub := &mgmt.Model{
Name: &name,
Properties: &mgmt.Properties{
PartitionCount: &partitionCount,
},
}
hub, err = client.CreateOrUpdate(ctx, ResourceGroupName, namespace, name, *newHub)
if err != nil {
return nil, err
}
}
return &hub, nil
}
func getEventHubMgmtClient() *mgmt.EventHubsClient {
subID := mustGetenv("AZURE_SUBSCRIPTION_ID")
client := mgmt.NewEventHubsClientWithBaseURI(azure.PublicCloud.ResourceManagerEndpoint, subID)
a, err := azauth.NewAuthorizerFromEnvironment()
if err != nil {
log.Fatal(err)
}
client.Authorizer = a
return &client
}