-
Notifications
You must be signed in to change notification settings - Fork 1
/
push.go
114 lines (98 loc) · 2.61 KB
/
push.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
//go:build !test
package push
import (
"context"
"errors"
"fmt"
"reflect"
"sync"
"time"
"github.com/ZNotify/server/app/db/helper"
"github.com/ZNotify/server/app/global"
"github.com/ZNotify/server/app/manager/push/enum"
"github.com/ZNotify/server/app/manager/push/interfaces"
"github.com/ZNotify/server/app/manager/push/item"
"github.com/ZNotify/server/app/utils"
"go.uber.org/zap"
)
func Send(ctx context.Context, msg *item.PushMessage) error {
zap.S().Infof("Send message to %s", helper.GetReadableName(msg.User))
var errs []string
var wg sync.WaitGroup
wg.Add(len(activeSenders))
for _, v := range activeSenders {
go func(sender interfaces.Sender) {
defer wg.Done()
pe := sender.Send(ctx, msg)
if pe != nil {
errString := fmt.Sprintf("Send message to %s failed: %v", sender.Name(), pe)
errs = append(errs, errString)
}
}(v)
}
if utils.WaitTimeout(&wg, 5*time.Second) {
return errors.New("send timeout")
}
if len(errs) > 0 {
val := ""
for _, v := range errs {
val += v + "\n"
}
return errors.New(val)
}
return nil
}
func Init() {
cfgSendersV := reflect.ValueOf(global.App.Config.Senders)
if cfgSendersV.IsZero() {
zap.S().Fatalf("No sender found in config")
}
for k := 0; k < cfgSendersV.NumField(); k++ {
var senderCfg any
field := cfgSendersV.Type().Field(k)
senderName := field.Name
senderCfgField := cfgSendersV.Field(k)
senderCfg = senderCfgField.Interface()
if IsSenderActive(enum.Sender(senderName)) {
zap.S().Fatalf("Sender %s load twice", senderName)
}
sender, err := GetSender(enum.Sender(senderName))
if err != nil {
zap.S().Fatalf("Failed to get sender %s: %v", senderName, err)
}
if cfgSender, ok := sender.(interfaces.SenderWithConfig); ok {
if senderCfgField.IsZero() {
zap.S().Infof("Sender %s is disabled", senderName)
continue
}
err = cfgSender.Init(senderCfg)
if err != nil {
zap.S().Fatalf("Sender %s init failed: %v", senderName, err)
}
} else {
cs, ok := sender.(interfaces.SenderWithoutConfig)
if ok {
enable := senderCfg.(bool)
if !enable {
zap.S().Infof("Sender %s is disabled", senderName)
continue
}
err = cs.Init()
if err != nil {
zap.S().Fatalf("Sender %s init failed: %v", senderName, err)
}
}
}
if host, ok := sender.(interfaces.SenderWithBackground); ok {
err = host.Setup()
if err != nil {
zap.S().Fatalf("Sender %s start failed: %v", sender.Name(), err)
}
}
activeSenders = append(activeSenders, sender)
zap.S().Infof("Sender %s is loaded", senderName)
}
if len(activeSenders) == 0 {
zap.S().Fatalf("No sender enabled")
}
}