forked from actionpay/postmanq
/
service.go
111 lines (91 loc) · 2.79 KB
/
service.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
package limiter
import (
"gopkg.in/yaml.v3"
"github.com/Halfi/postmanq/common"
"github.com/Halfi/postmanq/logger"
)
// Service сервис ограничений, следит за тем, чтобы почтовым сервисам не отправилось больше писем, чем нужно
type Service struct {
// LimitersCount количество горутин проверяющих количество отправленных писем
LimitersCount int `yaml:"workers"`
Configs map[string]*Config `yaml:"postmans"`
cleaner *Cleaner
events chan *common.SendEvent
eventsClosed bool
}
// Inst создает сервис ограничений
func Inst() common.SendingService {
return &Service{}
}
// OnInit инициализирует сервис
func (s *Service) OnInit(event *common.ApplicationEvent) {
logger.All().Debug("init limits...")
err := yaml.Unmarshal(event.Data, s)
if err != nil {
logger.All().ErrErr(err)
}
if len(s.Configs) == 0 {
logger.All().FailExit("limiter config is empty")
return
}
s.events = make(chan *common.SendEvent)
s.eventsClosed = false
s.cleaner = newCleaner(s)
for name, config := range s.Configs {
s.init(config, name)
}
if s.LimitersCount == 0 {
s.LimitersCount = common.DefaultWorkersCount
}
}
func (s *Service) init(conf *Config, hostname string) {
// инициализируем ограничения
for host, limit := range conf.Limits {
if limit.duration == 0 {
delete(conf.Limits, host)
logger.By(hostname).Warn("wrong limits settings")
}
limit.init()
logger.By(hostname).Debug("create limit for %s with type %v and duration %v", host, limit.bindingType, limit.duration)
}
}
// OnRun запускает проверку ограничений и очистку значений лимитов
func (s *Service) OnRun() {
// сразу запускаем проверку значений ограничений
go s.cleaner.run()
for i := 0; i < s.LimitersCount; i++ {
go newLimiter(i+1, s)
}
}
// Event send event
func (s *Service) Event(ev *common.SendEvent) bool {
if s.eventsClosed {
return false
}
s.events <- ev
return true
}
// OnFinish завершает работу сервиса соединений
func (s *Service) OnFinish() {
if !s.eventsClosed {
s.eventsClosed = true
close(s.events)
s.cleaner.stop()
s.cleaner = nil
}
}
func (s *Service) getLimit(hostnameFrom, hostnameTo string) *Limit {
if config, ok := s.Configs[hostnameFrom]; ok {
if limit, has := config.Limits[hostnameTo]; has {
return limit
} else {
return nil
}
} else {
return nil
}
}
type Config struct {
// ограничения для почтовых сервисов, в качестве ключа используется домен
Limits map[string]*Limit `yaml:"limits"`
}