-
Notifications
You must be signed in to change notification settings - Fork 0
/
main.go
50 lines (45 loc) · 1.29 KB
/
main.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
package main
import (
"bytes"
"encoding/binary"
"github.com/confluentinc/confluent-kafka-go/kafka"
"log"
)
const (
topic = "to-notify-topic" // To consume from
adminEmail = "dluumi0ke@relay.firefox.com" // Email address to which send the email - could be loaded from config or a DB
adminName = "Camille" // Name of the admin
)
func main() {
log.Println("~~~ Notification service started ~~~")
// Consume queue of alerts
consumer := getConsumer(topic)
defer consumer.Close()
for {
ev := consumer.Poll(0)
switch e := ev.(type) {
case *kafka.Message:
alertId := getAlertId(e.Value)
// Retrieve the record
ts := getAlertEventTime(alertId)
// Prepare email for admin
// Send email to admin
sendEmail(adminEmail, adminName, ts, alertId)
// Update status of alert to sent
checkAlert(alertId)
case kafka.PartitionEOF:
log.Printf("%% Reached %v\n", e)
case kafka.Error:
log.Fatalf("%% Error: %v\n", e)
}
}
}
// getAlertId decode the binary representation of the alert id used over the wire into an integer.
func getAlertId(value []byte) int {
var r = bytes.NewReader(value)
var id uint64
if err := binary.Read(r, binary.LittleEndian, &id); err != nil {
log.Fatalf("failed to decode alert id: %v\n", err)
}
return int(id)
}