-
Notifications
You must be signed in to change notification settings - Fork 1
/
PulsarLib.go
73 lines (64 loc) · 1.65 KB
/
PulsarLib.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
package PulsarLib
import (
"context"
"fmt"
"log"
"time"
"github.com/apache/pulsar-client-go/pulsar"
)
//Default configuration URL: "pulsar://localhost:6650"
func InitClient(URL string) *pulsar.Client {
client, err := pulsar.NewClient(pulsar.ClientOptions{
URL: URL,
OperationTimeout: 30 * time.Second,
ConnectionTimeout: 30 * time.Second,
})
if err != nil {
log.Fatalf("Could not instantiate Pulsar client: %v", err)
}
return &client
}
func CreateConsumer(client *pulsar.Client, topic string, sub_name string) *pulsar.Consumer {
consumer, err := (*client).Subscribe(pulsar.ConsumerOptions{
Topic: topic,
SubscriptionName: sub_name,
Type: pulsar.Shared,
})
if err != nil {
log.Fatal(err)
}
return &consumer
}
func ReceiveMessage(consumer *pulsar.Consumer) []byte {
msg, err := (*consumer).Receive(context.Background())
if err != nil {
log.Fatal(err)
}
fmt.Printf("Received message msgId: %#v -- content: '%s'\n",
msg.ID(), string(msg.Payload()))
(*consumer).Ack(msg)
return msg.Payload()
}
func DestroyConsumer(consumer *pulsar.Consumer) {
if err := (*consumer).Unsubscribe(); err != nil {
log.Fatal(err)
}
}
func CreateProducer(client *pulsar.Client, topic string) *pulsar.Producer {
producer, err := (*client).CreateProducer(pulsar.ProducerOptions{
Topic: topic,
})
if err != nil {
log.Fatal(err)
}
return &producer
}
func SendMessage(producer *pulsar.Producer, message []byte) {
_, err := (*producer).Send(context.Background(), &pulsar.ProducerMessage{
Payload: message,
})
if err != nil {
fmt.Println("Failed to publish message", err)
}
fmt.Println("Published message")
}