/
queue_handler.go
81 lines (67 loc) · 2.64 KB
/
queue_handler.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
package main
import (
"encoding/json"
"github.com/Financial-Times/annotations-rw-neo4j/v3/annotations"
logger "github.com/Financial-Times/go-logger/v2"
"github.com/Financial-Times/kafka-client-go/kafka"
"github.com/Financial-Times/transactionid-utils-go"
"github.com/pkg/errors"
)
type queueHandler struct {
annotationsService annotations.Service
consumer kafka.Consumer
producer kafka.Producer
originMap map[string]string
lifecycleMap map[string]string
messageType string
log *logger.UPPLogger
}
//Note: this will only work for annotation messages, and not for suggestion
type queueMessage struct {
UUID string
Annotations annotations.Annotations
}
func (qh *queueHandler) Ingest() {
qh.consumer.StartListening(func(message kafka.FTMessage) error {
tid, found := message.Headers[transactionidutils.TransactionIDHeader]
if !found {
return errors.New("Missing transaction id from message")
}
originSystem, found := message.Headers["Origin-System-Id"]
if !found {
return errors.New("Missing Origini-System-Id header from message")
}
lifecycle, platformVersion, err := qh.getSourceFromHeader(originSystem)
if err != nil {
return err
}
annMsg := new(queueMessage)
err = json.Unmarshal([]byte(message.Body), &annMsg)
if err != nil {
return errors.Errorf("Cannot process received message %s", tid)
}
err = qh.annotationsService.Write(annMsg.UUID, lifecycle, platformVersion, tid, annMsg.Annotations)
if err != nil {
qh.log.WithMonitoringEvent("SaveNeo4j", tid, qh.messageType).WithUUID(annMsg.UUID).WithError(err).Error("Cannot write to Neo4j")
return errors.Wrapf(err, "Failed to write message with tid=%s and uuid=%s", tid, annMsg.UUID)
}
qh.log.WithMonitoringEvent("SaveNeo4j", tid, qh.messageType).WithUUID(annMsg.UUID).Infof("%s successfully written in Neo4j", qh.messageType)
//forward message to the next queue
if qh.producer != nil {
qh.log.WithTransactionID(tid).WithUUID(annMsg.UUID).Debug("Forwarding message to the next queue")
return qh.producer.SendMessage(message)
}
return nil
})
}
func (qh *queueHandler) getSourceFromHeader(originSystem string) (string, string, error) {
annotationLifecycle, found := qh.originMap[originSystem]
if !found {
return "", "", errors.Errorf("Annotation Lifecycle not found for origin system id: %s", originSystem)
}
platformVersion, found := qh.lifecycleMap[annotationLifecycle]
if !found {
return "", "", errors.Errorf("Platform version not found for origin system id: %s and annotation lifecycle: %s", originSystem, annotationLifecycle)
}
return annotationLifecycle, platformVersion, nil
}