/
main.go
139 lines (115 loc) · 3.51 KB
/
main.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
/*
Copyright 2021 Adevinta
*/
package main
import (
"context"
"database/sql"
"flag"
"fmt"
"os"
"sync"
metrics "github.com/adevinta/vulcan-metrics-client"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/ses"
_ "github.com/lib/pq"
log "github.com/sirupsen/logrus"
"github.com/adevinta/vulcan-reports-generator/pkg/api"
"github.com/adevinta/vulcan-reports-generator/pkg/model"
"github.com/adevinta/vulcan-reports-generator/pkg/notify"
"github.com/adevinta/vulcan-reports-generator/pkg/queue"
"github.com/adevinta/vulcan-reports-generator/pkg/report"
"github.com/adevinta/vulcan-reports-generator/pkg/storage"
)
const (
pg = "postgres"
pgConStrFmt = "host=%s port=%s user=%s password=%s dbname=%s sslmode=%s"
defRegion = "eu-west-1"
)
func main() {
// Read config.
cfgFilePath := flag.String("c", "./config.toml", "configuration file")
flag.Parse()
conf, err := parseConfig(*cfgFilePath)
if err != nil {
log.WithError(err).Fatal("Error reading configuration")
}
logger := setupLogger(*conf)
// Build AWS session.
awsSess := session.Must(session.NewSession())
// Build notifier.
if conf.SES.Region == "" {
conf.SES.Region = defRegion
}
notifier, err := notify.NewSESNotifier(conf.SES, ses.New(awsSess, &aws.Config{
Region: aws.String(conf.SES.Region),
}))
if err != nil {
logger.WithError(err).Fatal("Error creating notifier")
}
// Build metrics client.
metricsClient, err := metrics.NewClient()
if err != nil {
logger.WithError(err).Fatal("Error creating metrics client")
}
// Build DB.
connStr := fmt.Sprintf(pgConStrFmt, conf.DB.Host, conf.DB.Port,
conf.DB.User, conf.DB.Pass, conf.DB.Name, conf.DB.SSLMode)
db, err := sql.Open(pg, connStr)
if err != nil {
logger.WithError(err).Fatal("Error connecting to DB")
}
defer db.Close()
// Build generate Use Cases.
generateUCC := map[model.ReportType]report.GenerateUC{}
repositories := map[model.ReportType]storage.ReportsRepository{}
for t, gconf := range conf.Generators {
typ := model.ReportType(t)
g, err := report.NewGenerator(t, gconf, logger, db)
if err != nil {
logger.WithError(err).WithFields(
log.Fields{"type": t},
).Fatal("Error building generator")
}
r, err := storage.NewReportsRepository(t, db)
if err != nil {
logger.WithError(err).WithFields(
log.Fields{"type": t},
).Fatal("Error building repository")
}
uc, err := report.NewGenerateUC(typ, logger, g, r)
if err != nil {
logger.WithError(err).WithFields(
log.Fields{"type": t},
).Fatal("Error building generate UC")
}
generateUCC[typ] = uc
repositories[typ] = r
}
// Build processor.
processor, err := report.NewProcessor(logger, generateUCC, notifier, metricsClient)
if err != nil {
logger.WithError(err).Fatal("Error creating queue processor")
}
sqsConsumerGroup, err := queue.NewSQSConsumerGroup(conf.SQS.NProcessors, conf.SQS.SQSConfig, processor, logger)
if err != nil {
logger.WithError(err).Fatal("Error creating queue consumer group")
}
// Build and start API.
api := api.NewReportsAPI(api.NewReportsService(logger, notifier, repositories))
go api.Start(conf.API.Port)
// Start Consumer group.
var wg sync.WaitGroup
sqsConsumerGroup.Start(context.Background(), &wg)
logger.Info("Started")
wg.Wait()
}
func setupLogger(cfg config) *log.Logger {
var logger = log.New()
logger.SetFormatter(&log.JSONFormatter{})
logger.SetOutput(os.Stdout)
logger.SetLevel(parseLogLevel(cfg.Log.Level))
logger.SetReportCaller(cfg.Log.AddCaller)
return logger
}