-
Notifications
You must be signed in to change notification settings - Fork 1
/
kafkaProducer.go
executable file
·63 lines (57 loc) · 1.66 KB
/
kafkaProducer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
package kafka
import (
"github.com/Shopify/sarama"
"github.com/commitHub/commitBlockchain/codec"
)
// NewProducer is a producer to send messages to kafka
func NewProducer(kafkaPorts []string) sarama.SyncProducer {
producer, err := sarama.NewSyncProducer(kafkaPorts, nil)
if err != nil {
panic(err)
}
return producer
}
// KafkaProducerDeliverMessage : delivers messages to kafka
func KafkaProducerDeliverMessage(msg KafkaMsg, topic string, producer sarama.SyncProducer, cdc *codec.Codec) error {
kafkaStoreBytes, err := cdc.MarshalJSON(msg)
if err != nil {
panic(err)
}
sendMsg := sarama.ProducerMessage{
Topic: topic,
Value: sarama.ByteEncoder(kafkaStoreBytes),
}
_, _, err = producer.SendMessage(&sendMsg)
if err != nil {
return err
}
return nil
}
// SendToKafka : handles sending message to kafka
func SendToKafka(msg KafkaMsg, kafkaState KafkaState, cdc *codec.Codec) []byte {
err := KafkaProducerDeliverMessage(msg, "Topic", kafkaState.Producer, cdc)
if err != nil {
jsonResponse, err := cdc.MarshalJSON(struct {
Response string `json:"response"`
}{Response: "Something is up with kafka server, restart rest and kafka."})
if err != nil {
panic(err)
}
SetTicketIDtoDB(msg.TicketID, kafkaState.KafkaDB, cdc, jsonResponse)
} else {
jsonResponse, err := cdc.MarshalJSON(struct {
Error string `json:"error"`
}{Error: "Request in process, wait and try after some time"})
if err != nil {
panic(err)
}
SetTicketIDtoDB(msg.TicketID, kafkaState.KafkaDB, cdc, jsonResponse)
}
jsonResponse, err := cdc.MarshalJSON(struct {
TicketID Ticket `json:"ticketID"`
}{TicketID: msg.TicketID})
if err != nil {
panic(err)
}
return jsonResponse
}