/
main.go
64 lines (50 loc) · 1.93 KB
/
main.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
package main
import (
"flag"
"github.com/Shopify/sarama"
"github.com/martin-helmich/cloudnativego-backend/src/bookingservice/listener"
"github.com/martin-helmich/cloudnativego-backend/src/bookingservice/rest"
"github.com/martin-helmich/cloudnativego-backend/src/lib/configuration"
"github.com/martin-helmich/cloudnativego-backend/src/lib/msgqueue"
msgqueue_amqp "github.com/martin-helmich/cloudnativego-backend/src/lib/msgqueue/amqp"
"github.com/martin-helmich/cloudnativego-backend/src/lib/msgqueue/kafka"
"github.com/martin-helmich/cloudnativego-backend/src/lib/persistence/dblayer"
"github.com/streadway/amqp"
)
func panicIfErr(err error) {
if err != nil {
panic(err)
}
}
func main() {
var eventListener msgqueue.EventListener
var eventEmitter msgqueue.EventEmitter
confPath := flag.String("conf", "./configuration/config.json", "flag to set the path to the configuration json file")
flag.Parse()
//extract configuration
config, _ := configuration.ExtractConfiguration(*confPath)
switch config.MessageBrokerType {
case "amqp":
conn, err := amqp.Dial(config.AMQPMessageBroker)
panicIfErr(err)
eventListener, err = msgqueue_amqp.NewAMQPEventListener(conn, "events", "booking")
panicIfErr(err)
eventEmitter, err = msgqueue_amqp.NewAMQPEventEmitter(conn, "events")
panicIfErr(err)
case "kafka":
conf := sarama.NewConfig()
conf.Producer.Return.Successes = true
conn, err := sarama.NewClient(config.KafkaMessageBrokers, conf)
panicIfErr(err)
eventListener, err = kafka.NewKafkaEventListener(conn, []int32{})
panicIfErr(err)
eventEmitter, err = kafka.NewKafkaEventEmitter(conn)
panicIfErr(err)
default:
panic("Bad message broker type: " + config.MessageBrokerType)
}
dbhandler, _ := dblayer.NewPersistenceLayer(config.Databasetype, config.DBConnection)
processor := listener.EventProcessor{eventListener, dbhandler}
go processor.ProcessEvents()
rest.ServeAPI(config.RestfulEndpoint, dbhandler, eventEmitter)
}