This repository has been archived by the owner on Jan 17, 2022. It is now read-only.
/
queuehandler.go
104 lines (90 loc) · 3.32 KB
/
queuehandler.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
package handler
import (
"os"
"os/signal"
"sync"
"syscall"
"time"
"github.com/Financial-Times/message-queue-go-producer/producer"
consumer "github.com/Financial-Times/message-queue-gonsumer"
"github.com/Financial-Times/methode-content-placeholder-mapper/v2/mapper"
"github.com/Financial-Times/methode-content-placeholder-mapper/v2/message"
"github.com/Financial-Times/methode-content-placeholder-mapper/v2/model"
log "github.com/sirupsen/logrus"
)
type MessageHandler interface {
HandleMessage(msg consumer.Message)
StartHandlingMessages()
}
type CPHMessageHandler struct {
MessageConsumer consumer.MessageConsumer
messageProducer producer.MessageProducer
nativeMapper mapper.MessageToContentPlaceholderMapper
cphMapper mapper.CPHAggregateMapper
messageCreator message.MessageCreator
}
func NewCPHMessageHandler(c consumer.MessageConsumer,
p producer.MessageProducer,
mapper mapper.CPHAggregateMapper,
nativeMapper mapper.MessageToContentPlaceholderMapper,
messageCreator message.MessageCreator) *CPHMessageHandler {
return &CPHMessageHandler{
MessageConsumer: c,
messageProducer: p,
nativeMapper: nativeMapper,
cphMapper: mapper,
messageCreator: messageCreator,
}
}
func (kqh *CPHMessageHandler) HandleMessage(msg consumer.Message) {
tid := msg.Headers["X-Request-Id"]
if msg.Headers["Origin-System-Id"] != model.MethodeSystemID {
log.WithField("transaction_id", tid).WithField("Origin-System-Id", msg.Headers["Origin-System-Id"]).Info("Ignoring message with different Origin-System-Id")
return
}
lmd, ok := msg.Headers["Message-Timestamp"]
if !ok {
lmd = time.Now().Format(model.UPPDateFormat)
}
methodePlaceholder, err := kqh.nativeMapper.Map([]byte(msg.Body))
if err != nil {
if _, ok := err.(*model.InvalidMethodeCPH); ok {
log.WithField("transaction_id", tid).WithError(err).Infof(err.Error())
} else {
log.WithField("transaction_id", tid).WithError(err).Error("Error creating methode model from queue message")
}
return
}
transformedContents, err := kqh.cphMapper.MapContentPlaceholder(methodePlaceholder, tid, lmd)
if err != nil {
log.WithField("transaction_id", tid).WithError(err).Error("Error transforming content")
return
}
for _, transformedContent := range transformedContents {
eventMessage, err := kqh.messageCreator.ToPublicationEventMessage(transformedContent.GetUppCoreContent(), transformedContent)
if err != nil {
log.WithField("transaction_id", tid).WithField("uuid", transformedContent.GetUUID()).WithError(err).Warn("Error creating transformed content message to queue")
return
}
rawErr := kqh.messageProducer.SendMessage("", *eventMessage)
if rawErr != nil {
log.WithField("transaction_id", tid).WithField("uuid", transformedContent.GetUUID()).WithError(err).Warn("Error sending transformed content message to queue")
return
}
log.WithField("transaction_id", tid).WithField("uuid", transformedContent.GetUUID()).Info("Content mapped and sent to the queue")
}
}
func (kqh *CPHMessageHandler) StartHandlingMessages() {
log.Infof("Starting queue consumer...")
var consumerWaitGroup sync.WaitGroup
consumerWaitGroup.Add(1)
go func() {
kqh.MessageConsumer.Start()
consumerWaitGroup.Done()
}()
ch := make(chan os.Signal)
signal.Notify(ch, syscall.SIGINT, syscall.SIGTERM)
<-ch
kqh.MessageConsumer.Stop()
consumerWaitGroup.Wait()
}